public inbox for [email protected]  
help / color / mirror / Atom feed
From: Filip Janus <[email protected]>
To: lakshmi <[email protected]>
Cc: pgsql-hackers <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Adding compression of temporary files
Date: Fri, 23 Jan 2026 17:40:32 +0100
Message-ID: <CAFjYY+LMTciR=3SLh+8EbAFjumQTrcTKbyU703Srzy3j_yEhSw@mail.gmail.com> (raw)
In-Reply-To: <CAEvyyTj5G3BAFYt=VHJPLFF_meJGmSEk+J4BqM7gGUen8sXchw@mail.gmail.com>
References: <CAFjYY++22tmGDSac_YPA_NLJ4-cEJo7kS3SxXFHf7ZFTnwO-QA@mail.gmail.com>
	<CAFjYY+J7ZcJAoE+K4AoyuiS_N6w8vVCQ6HQ72mPB=i4C+QbAeQ@mail.gmail.com>
	<CAFjYY+KhiuzocMdgEr8wQtCkSaec=Mu_YiLE6N3JfYtQ-Xv=ug@mail.gmail.com>
	<CAFjYY++YjtunZcFt7MFBWwseP_=GS11=V36_xig07enHA0eRjA@mail.gmail.com>
	<CAPpHfdvD-oqh=qvtkD3qWqZrfxt0+PtdwyRYC+faQd-rr+V0mg@mail.gmail.com>
	<[email protected]>
	<CAFjYY+Lhw2tvsiC8_Dbx+K9dMaK=myT5Cck2=8FgLUW5f9Hp-A@mail.gmail.com>
	<udbqvyuxlxk4a64uomo5xmgloemk7jrbeeluhvxbdqxcdv2pt5@who34o73ttmo>
	<CAFjYY+LcBCPe9cboef1GteAPKEiqyTTww7NGxa==roc6QXSOxA@mail.gmail.com>
	<CAFjYY+LJ5_j82PrSxm2RjM0OjwfN9smJ1VKKMOka-25Jg-0ofg@mail.gmail.com>
	<7gbzkoxrpqvj2sgxof4pqirz7pf6wb5vizak3mxrz2zpvrf3pa@lvnwdsy3em56>
	<CAFjYY+LH=MbMpGiYfMtPsANDBAqiDC66hD3tBs_JeVcBYVcmHA@mail.gmail.com>
	<CAFjYY+Jfsy1r4ZP4+RyHZeWE7JyT4LSdVFvA9ck04LFkKd1ebw@mail.gmail.com>
	<CAFjYY+K=s8gL=3SN8puwjSm5ZvUXumsdeB-Z2rwLP5OQNtEF+Q@mail.gmail.com>
	<CAEvyyThUp+Cyv8Se6dxpJvP7w=u1670Xd4k6zeXGVSac7UOm_A@mail.gmail.com>
	<CAFjYY+KjoNLYGFeM4z=DOgG1uDwXD1Z6zyurgz1d9Y-96X=9GA@mail.gmail.com>
	<CAFjYY+JDSpOQwYAfTQQ43=BA=d32XfcAdaPVJgHheV9fQBbLWg@mail.gmail.com>
	<CAN4CZFPmAwOvR8r1gtv=r+4akArZJyjna_Nxgkn_eTcsRs4XPA@mail.gmail.com>
	<CAEvyyTjiUhEUX5jxJkHHncnKSuCmf=MitZLtDt+Z6WsNNo0vCQ@mail.gmail.com>
	<CAEvyyTj5G3BAFYt=VHJPLFF_meJGmSEk+J4BqM7gGUen8sXchw@mail.gmail.com>

Hi all,
Thanks for the feedback and the provided patch.
I've addressed your findings and proposals. Lakshmi's documentation patch
was incorporated.

    -Filip-


st 21. 1. 2026 v 7:30 odesílatel lakshmi <[email protected]> napsal:

> HI all,
> While testing the temp file compression patch,noticed that the new
> temp_file_compression GUC isn't documented yet.I put together a small docs
> patch to add a short description and clarify that the effect of compression
> depends on the workload(for example ,hash join spills may not show visible
> size reduction due to fixed_size chunks).
>
> patch is attached.Happy to adjust the wording if needed.
> thanks,
> lakshmi
>
> On Tue, Jan 20, 2026 at 4:21 PM lakshmi <[email protected]> wrote:
>
>> Hi Filip,
>>
>> I tested both patches on current master using git am -3 .They apply
>> cleanly,build fine,and the temp_file _compression GUC works as expected.
>> Query results are unchanged.
>>
>> For hash join spill test,temp files were created as expected,but the
>> logged size were same for no,lz4,and pglz,which seems consistent with
>> fixed-size fileset chunking.It might be helpful to briefly note this in the
>> documentation to avoid confusion.
>>
>> Thanks for working on this .
>> best regards,
>> lakshmi
>>
>> On Tue, Jan 20, 2026 at 4:10 AM Zsolt Parragi <[email protected]>
>> wrote:
>>
>>> Hello!
>>>
>>> I tried to review the code. It compiled, the test suite passed.
>>>
>>> I noticed two typos:
>>>
>>> buffile.c:77 - "Disaled"
>>> buffile.c:133 - "mathods"
>>>
>>> And a few other small findings:
>>>
>>> buffile.h:35 and buffile.c:63 - same constants defined first as an
>>> Enum and then as #defines - code builds properly without the defines.
>>>
>>> buffile.c:121 - compress_tempfile is defined, set to false at :167,
>>> but never used otherwise
>>>
>>> guc_tables.c:470 - the comment says that pglz isn't supported yet, but
>>> we have a value for it, and I see support for it in the code
>>>
>>> buffile.c:659: (and at other places) if USE_LZ4 is undefined, the
>>> codepath doesn't do anything. I think these ifdefs should follow how
>>> other compression code works, such as wal compression where there's an
>>> #else path with elog(ERROR, ...)
>>> Similarly, maybe there should be an explicit TEMP_NONE_COMPRESSION
>>> branch that does nothing, and the default branch should be an error?
>>>
>>> buffile.c:265: If seek isn't supported/limited, shouldn't there be at
>>> least an assertion about it in BufFileSeek? And tell isn't mentioned,
>>> but it seems to me that tell also doesn't work properly.
>>>
>>


Attachments:

  [application/octet-stream] 0002-Add-regression-tests-for-temporary-file-compression.patch (127.6K, 3-0002-Add-regression-tests-for-temporary-file-compression.patch)
  download | inline diff:
From a5c6b7b0f977ade9884d7350deb9fdf93df312ae Mon Sep 17 00:00:00 2001
From: Filip Janus <[email protected]>
Date: Thu, 31 Jul 2025 14:02:45 +0200
Subject: [PATCH 2/2] Add regression tests for temporary file compression

This commit adds comprehensive regression tests for the transparent
temporary file compression feature.

Test coverage:
- join_hash_lz4.sql: Tests hash join operations with LZ4 compression
- join_hash_pglz.sql: Tests hash join operations with PGLZ compression
- Both tests verify compression works correctly for various hash join scenarios
- Expected output files for validation

Test integration:
- LZ4 tests are conditionally enabled when PostgreSQL is built with --with-lz4
- PGLZ tests are always enabled as PGLZ is built-in
- Tests added to parallel regression test schedule
- GNUmakefile updated to include conditional LZ4 test execution

The tests ensure that compression/decompression works transparently
without affecting query results, while providing coverage for both
supported compression algorithms.
---
 src/Makefile.global.in                       |    1 +
 src/test/regress/GNUmakefile                 |    4 +
 src/test/regress/expected/join_hash_lz4.out  | 1167 ++++++++++++++++++
 src/test/regress/expected/join_hash_pglz.out | 1167 ++++++++++++++++++
 src/test/regress/parallel_schedule           |    4 +-
 src/test/regress/sql/join_hash_lz4.sql       |  627 ++++++++++
 src/test/regress/sql/join_hash_pglz.sql      |  627 ++++++++++
 7 files changed, 3596 insertions(+), 1 deletion(-)
 create mode 100644 src/test/regress/expected/join_hash_lz4.out
 create mode 100644 src/test/regress/expected/join_hash_pglz.out
 create mode 100644 src/test/regress/sql/join_hash_lz4.sql
 create mode 100644 src/test/regress/sql/join_hash_pglz.sql

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 947a2d79e29..87c950cca82 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -201,6 +201,7 @@ with_liburing	= @with_liburing@
 with_libxml	= @with_libxml@
 with_libxslt	= @with_libxslt@
 with_llvm	= @with_llvm@
+with_lz4	= @with_lz4@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
diff --git a/src/test/regress/GNUmakefile b/src/test/regress/GNUmakefile
index a8ba19e5971..5c4e25bc07c 100644
--- a/src/test/regress/GNUmakefile
+++ b/src/test/regress/GNUmakefile
@@ -94,6 +94,10 @@ installdirs-tests: installdirs
 REGRESS_OPTS = --dlpath=. --max-concurrent-tests=20 \
 	$(EXTRA_REGRESS_OPTS)
 
+ifeq ($(with_lz4),yes)
+override EXTRA_TESTS := $(EXTRA_TESTS) join_hash_lz4
+endif
+
 check: all
 	$(pg_regress_check) $(REGRESS_OPTS) --schedule=$(srcdir)/parallel_schedule $(MAXCONNOPT) $(EXTRA_TESTS)
 
diff --git a/src/test/regress/expected/join_hash_lz4.out b/src/test/regress/expected/join_hash_lz4.out
new file mode 100644
index 00000000000..78367bf3719
--- /dev/null
+++ b/src/test/regress/expected/join_hash_lz4.out
@@ -0,0 +1,1167 @@
+--
+-- exercises for the hash join code
+--
+begin;
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on bigger_than_it_looks s
+(6 rows)
+
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                    QUERY PLAN                    
+--------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on extremely_skewed s
+(6 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                       QUERY PLAN                       
+--------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Hash
+                     ->  Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     4
+(1 row)
+
+rollback to settings;
+-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+analyze join_foo, join_bar;
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Left Join
+                     Hash Cond: (wide.id = wide_1.id)
+                     ->  Parallel Seq Scan on wide
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length 
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: ((hjtest_1.id = (SubPlan expr_1)) AND ((SubPlan expr_2) = (SubPlan expr_3)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_1
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         Filter: ((SubPlan expr_4) < 50)
+         SubPlan expr_4
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   ->  Hash
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         ->  Seq Scan on public.hjtest_2
+               Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+               Filter: ((SubPlan expr_5) < 55)
+               SubPlan expr_5
+                 ->  Result
+                       Output: (hjtest_2.c * 5)
+         SubPlan expr_1
+           ->  Result
+                 Output: 1
+                 One-Time Filter: (hjtest_2.id = 1)
+         SubPlan expr_3
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   SubPlan expr_2
+     ->  Result
+           Output: (hjtest_1.b * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: (((SubPlan expr_1) = hjtest_1.id) AND ((SubPlan expr_3) = (SubPlan expr_2)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_2
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         Filter: ((SubPlan expr_5) < 55)
+         SubPlan expr_5
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   ->  Hash
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         ->  Seq Scan on public.hjtest_1
+               Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+               Filter: ((SubPlan expr_4) < 50)
+               SubPlan expr_4
+                 ->  Result
+                       Output: (hjtest_1.b * 5)
+         SubPlan expr_2
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   SubPlan expr_1
+     ->  Result
+           Output: 1
+           One-Time Filter: (hjtest_2.id = 1)
+   SubPlan expr_3
+     ->  Result
+           Output: (hjtest_2.c * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+ROLLBACK;
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Nested Loop
+   ->  Seq Scan on int8_tbl i8
+   ->  Sort
+         Sort Key: t1.fivethous, i4.f1
+         ->  Hash Join
+               Hash Cond: (t1.fivethous = (i4.f1 + i8.q2))
+               ->  Seq Scan on tenk1 t1
+               ->  Hash
+                     ->  Seq Scan on int4_tbl i4
+(9 rows)
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+ q2  | fivethous | f1 
+-----+-----------+----
+ 456 |       456 |  0
+ 456 |       456 |  0
+ 123 |       123 |  0
+ 123 |       123 |  0
+(4 rows)
+
+rollback;
diff --git a/src/test/regress/expected/join_hash_pglz.out b/src/test/regress/expected/join_hash_pglz.out
new file mode 100644
index 00000000000..c6ae5e255d2
--- /dev/null
+++ b/src/test/regress/expected/join_hash_pglz.out
@@ -0,0 +1,1167 @@
+--
+-- exercises for the hash join code
+--
+begin;
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'pglz';
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on bigger_than_it_looks s
+(6 rows)
+
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                    QUERY PLAN                    
+--------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on extremely_skewed s
+(6 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                       QUERY PLAN                       
+--------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Hash
+                     ->  Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     4
+(1 row)
+
+rollback to settings;
+-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+analyze join_foo, join_bar;
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Left Join
+                     Hash Cond: (wide.id = wide_1.id)
+                     ->  Parallel Seq Scan on wide
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length 
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: ((hjtest_1.id = (SubPlan expr_1)) AND ((SubPlan expr_2) = (SubPlan expr_3)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_1
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         Filter: ((SubPlan expr_4) < 50)
+         SubPlan expr_4
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   ->  Hash
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         ->  Seq Scan on public.hjtest_2
+               Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+               Filter: ((SubPlan expr_5) < 55)
+               SubPlan expr_5
+                 ->  Result
+                       Output: (hjtest_2.c * 5)
+         SubPlan expr_1
+           ->  Result
+                 Output: 1
+                 One-Time Filter: (hjtest_2.id = 1)
+         SubPlan expr_3
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   SubPlan expr_2
+     ->  Result
+           Output: (hjtest_1.b * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: (((SubPlan expr_1) = hjtest_1.id) AND ((SubPlan expr_3) = (SubPlan expr_2)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_2
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         Filter: ((SubPlan expr_5) < 55)
+         SubPlan expr_5
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   ->  Hash
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         ->  Seq Scan on public.hjtest_1
+               Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+               Filter: ((SubPlan expr_4) < 50)
+               SubPlan expr_4
+                 ->  Result
+                       Output: (hjtest_1.b * 5)
+         SubPlan expr_2
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   SubPlan expr_1
+     ->  Result
+           Output: 1
+           One-Time Filter: (hjtest_2.id = 1)
+   SubPlan expr_3
+     ->  Result
+           Output: (hjtest_2.c * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+ROLLBACK;
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Nested Loop
+   ->  Seq Scan on int8_tbl i8
+   ->  Sort
+         Sort Key: t1.fivethous, i4.f1
+         ->  Hash Join
+               Hash Cond: (t1.fivethous = (i4.f1 + i8.q2))
+               ->  Seq Scan on tenk1 t1
+               ->  Hash
+                     ->  Seq Scan on int4_tbl i4
+(9 rows)
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+ q2  | fivethous | f1 
+-----+-----------+----
+ 456 |       456 |  0
+ 456 |       456 |  0
+ 123 |       123 |  0
+ 123 |       123 |  0
+(4 rows)
+
+rollback;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 021d57f66bb..eeb06407d11 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -15,7 +15,6 @@ test: test_setup
 # The first group of parallel tests
 # ----------
 test: boolean char name varchar text int2 int4 int8 oid float4 float8 bit numeric txid uuid enum money rangetypes pg_lsn regproc
-
 # ----------
 # The second group of parallel tests
 # multirangetypes depends on rangetypes
@@ -140,3 +139,6 @@ test: fast_default
 # run tablespace test at the end because it drops the tablespace created during
 # setup that other tests may use.
 test: tablespace
+
+# this test is equivalent to join_hash test just the compression is enabled
+test: join_hash_pglz
diff --git a/src/test/regress/sql/join_hash_lz4.sql b/src/test/regress/sql/join_hash_lz4.sql
new file mode 100644
index 00000000000..7f79242c306
--- /dev/null
+++ b/src/test/regress/sql/join_hash_lz4.sql
@@ -0,0 +1,627 @@
+--
+-- exercises for the hash join code
+--
+
+begin;
+
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- A couple of other hash join tests unrelated to work_mem management.
+
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+analyze join_foo, join_bar;
+
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- A full outer join where every record is matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
+
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ROLLBACK TO settings;
+
+rollback;
+
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+ROLLBACK;
+
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+rollback;
diff --git a/src/test/regress/sql/join_hash_pglz.sql b/src/test/regress/sql/join_hash_pglz.sql
new file mode 100644
index 00000000000..a7529ec3400
--- /dev/null
+++ b/src/test/regress/sql/join_hash_pglz.sql
@@ -0,0 +1,627 @@
+--
+-- exercises for the hash join code
+--
+
+begin;
+
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'pglz';
+
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- A couple of other hash join tests unrelated to work_mem management.
+
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+analyze join_foo, join_bar;
+
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- A full outer join where every record is matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
+
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ROLLBACK TO settings;
+
+rollback;
+
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+ROLLBACK;
+
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+rollback;
-- 
2.39.5 (Apple Git-154)



  [application/octet-stream] 0001-Add-transparent-compression-for-temporary-files.patch (21.0K, 4-0001-Add-transparent-compression-for-temporary-files.patch)
  download | inline diff:
From 2e5e6bdc3d28e342f525fb0f1cf76719f7a3213b Mon Sep 17 00:00:00 2001
From: Filip Janus <[email protected]>
Date: Thu, 31 Jul 2025 14:02:16 +0200
Subject: [PATCH 1/2] Add transparent compression for temporary files

This commit implements transparent compression for temporary files in PostgreSQL,
specifically designed for hash join operations that spill to disk.

Features:
- Support for LZ4 and PGLZ compression algorithms
- GUC parameter 'temp_file_compression' to control compression
- Transparent compression/decompression in BufFile layer
- Shared compression buffer to minimize memory allocation
- Integration: Hash join uses BufFileCreateCompressTemp(), tuplestore
     uses compression when EXEC_FLAG_BACKWARD is not required

The compression is applied automatically when temp_file_compression is enabled,
with no changes required to calling code. Only hash joins use compression
currently, with seeking limited to rewinding to start.

Configuration options:
- temp_file_compression = 'no' (default)
- temp_file_compression = 'pglz'
- temp_file_compression = 'lz4' (requires --with-lz4)

This patch also adds documentation for guc-temp-file-compression (contributed by Lakshmi <[email protected]>).
---
 doc/src/sgml/config.sgml                      |  20 +
 src/backend/executor/nodeHashjoin.c           |   2 +-
 src/backend/storage/file/buffile.c            | 374 +++++++++++++++++-
 src/backend/utils/misc/guc_parameters.dat     |   8 +
 src/backend/utils/misc/guc_tables.c           |  10 +
 src/backend/utils/misc/postgresql.conf.sample |   5 +-
 src/backend/utils/sort/tuplestore.c           |   8 +-
 src/include/storage/buffile.h                 |  10 +
 8 files changed, 414 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5560b95ee60..179e09a8604 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1955,6 +1955,26 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+    <varlistentry id="guc-temp-file-compression">
+     <term><varname>temp_file_compression</varname> (<type>enum</type>)</term>
+     <listitem>
+     <para>
+      Enables transparent compression of temporary files used by query execution.
+      Supported values are <literal>no</literal>, <literal>lz4</literal>, and
+     <literal>pglz</literal>.
+     </para>
+
+     <para>
+     The effectiveness of temporary file compression depends on the workload.
+     For example, temporary files created by hash join spills use fixed-size
+     chunks, so on-disk file sizes may not visibly shrink even when compression
+     is enabled. Statistics such as <literal>temp_bytes</literal> report logical
+     bytes written before compression.
+     </para>
+     </listitem>
+     </varlistentry>
+
+
      <varlistentry id="guc-hash-mem-multiplier" xreflabel="hash_mem_multiplier">
       <term><varname>hash_mem_multiplier</varname> (<type>floating point</type>)
       <indexterm>
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 114620a8137..c2f1b2658e1 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 	{
 		MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		file = BufFileCreateTemp(false);
+		file = BufFileCreateCompressTemp(false);
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index ddf3a410d6f..e6d2af0f815 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -53,6 +53,12 @@
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "utils/resowner.h"
+#include "utils/memutils.h"
+#include "common/pg_lzcompress.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
 
 /*
  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
@@ -62,6 +68,11 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+/*
+ * Optional transparent compression of temporary files. Disabled by default.
+ */
+int			temp_file_compression = TEMP_NONE_COMPRESSION;
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -101,8 +112,28 @@ struct BufFile
 	 * wasting per-file alignment padding when some users create many files.
 	 */
 	PGAlignedBlock buffer;
+
+	int			compress;		/* enabled compression for the file */
+	char	   *cBuffer;		/* compression buffer */
 };
 
+/*
+ * Header written right before each chunk of data with compression enabled.
+ * The 'len' is the length of the data buffer written right after the header,
+ * and 'raw_len' is the length of uncompressed data. If the data ends up not
+ * being compressed (e.g. when pglz does not reach the compression ratio),
+ * the raw_len is set to -1 and the len is the raw (uncompressed) length.
+ *
+ * To make things simpler, we write these headers even for methods that do
+ * not fail (or rather when they fail, it's a proper error). The space for
+ * an extra integer seems negligible.
+ */
+typedef struct CompressHeader
+{
+	int			len;			/* data length (compressed, excluding header) */
+	int			raw_len;		/* raw length (-1: not compressed) */
+}			CompressHeader;
+
 static BufFile *makeBufFileCommon(int nfiles);
 static BufFile *makeBufFile(File firstfile);
 static void extendBufFile(BufFile *file);
@@ -127,6 +158,8 @@ makeBufFileCommon(int nfiles)
 	file->curOffset = 0;
 	file->pos = 0;
 	file->nbytes = 0;
+	file->compress = TEMP_NONE_COMPRESSION;
+	file->cBuffer = NULL;
 
 	return file;
 }
@@ -215,6 +248,63 @@ BufFileCreateTemp(bool interXact)
 	return file;
 }
 
+/*
+ * BufFileCreateCompressTemp
+ *		Create a temporary file with transparent compression.
+ *
+ * The temporary files will use compression, depending on the current value of
+ * temp_file_compression GUC.
+ *
+ * Note: Compressed files do not support random access. A seek operation other
+ * than seek to the beginning of the buffile will corrupt data.
+ *
+ * Note: The compression algorithm is determined by temp_file_compression GUC.
+ * If set to "none" (TEMP_NONE_COMPRESSION), the file is not compressed.
+ *
+ */
+BufFile *
+BufFileCreateCompressTemp(bool interXact)
+{
+	BufFile    *file = BufFileCreateTemp(interXact);
+
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+		int			size = 0;
+
+		switch (temp_file_compression)
+		{
+			case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+				size = LZ4_compressBound(BLCKSZ) + sizeof(CompressHeader);
+#else
+				elog(ERROR, "LZ4 is not supported by this build");
+#endif
+				break;
+			case TEMP_PGLZ_COMPRESSION:
+				size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ) + sizeof(CompressHeader);
+				break;
+			case TEMP_NONE_COMPRESSION:
+				/* no compression, nothing to do */
+				break;
+			default:
+				elog(ERROR, "unknown compression method: %d", temp_file_compression);
+				break;
+		}
+
+		if (size > 0)
+		{
+			file->compress = temp_file_compression;
+			file->cBuffer = palloc(size);
+		}
+	}
+
+	/* compression with buffer, or no compression and no buffer */
+	Assert((!file->compress && file->cBuffer == NULL) ||
+		   (file->compress && file->cBuffer != NULL));
+
+	return file;
+}
+
 /*
  * Build the name for a given segment of a given BufFile.
  */
@@ -415,9 +505,15 @@ BufFileClose(BufFile *file)
 
 	/* flush any unwritten data */
 	BufFileFlush(file);
+
 	/* close and delete the underlying file(s) */
 	for (i = 0; i < file->numFiles; i++)
 		FileClose(file->files[i]);
+
+	/* release compression buffer if allocated */
+	if (file->cBuffer)
+		pfree(file->cBuffer);
+
 	/* release the buffer space */
 	pfree(file->files);
 	pfree(file);
@@ -454,21 +550,145 @@ BufFileLoadBuffer(BufFile *file)
 	else
 		INSTR_TIME_SET_ZERO(io_start);
 
-	/*
-	 * Read whatever we can get, up to a full bufferload.
-	 */
-	file->nbytes = FileRead(thisfile,
-							file->buffer.data,
-							sizeof(file->buffer.data),
-							file->curOffset,
-							WAIT_EVENT_BUFFILE_READ);
-	if (file->nbytes < 0)
+	if (file->compress == TEMP_NONE_COMPRESSION)
 	{
-		file->nbytes = 0;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						FilePathName(thisfile))));
+		/*
+		 * Read whatever we can get, up to a full bufferload.
+		 */
+		file->nbytes = FileRead(thisfile,
+								file->buffer.data,
+								sizeof(file->buffer),
+								file->curOffset,
+								WAIT_EVENT_BUFFILE_READ);
+		if (file->nbytes < 0)
+		{
+			file->nbytes = 0;
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m",
+							FilePathName(thisfile))));
+		}
+	}
+	else
+	{
+		/*
+		 * Read and decompress data from a temporary file. We first read the
+		 * header with compressed/raw lengths, and then the compressed data.
+		 */
+		int			nread;
+		CompressHeader header;
+
+		nread = FileRead(thisfile,
+						 &header,
+						 sizeof(header),
+						 file->curOffset,
+						 WAIT_EVENT_BUFFILE_READ);
+
+		/* did we read the length of the next buffer? */
+		if (nread == 0)
+		{
+			/* eof, nothing to do */
+		}
+		else if (nread != sizeof(header))
+		{
+			/* unexpected number of bytes, also covers (nread < 0) */
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m",
+							FilePathName(thisfile))));
+		}
+		else
+		{
+			/* read length of compressed data, read (and decompress) data */
+			char	   *buff = file->cBuffer;
+
+			Assert(file->cBuffer != NULL);
+
+			/* advance past the length field */
+			file->curOffset += sizeof(header);
+
+			/*
+			 * raw_len==-1 means the data was not compressed after all, which
+			 * can happen e.g. for non-compressible data with pglz. In that
+			 * case just copy the data in place. Otherwise do the
+			 * decompression.
+			 *
+			 * XXX Maybe we should just do the FileRead first, and then either
+			 * decompress or memcpy() for raw_len=-1. That'd be an extra
+			 * memcpy, but it'd make the code simpler (this ways we do the
+			 * error checks twice, for each branch).
+			 */
+			if (header.raw_len == -1)
+			{
+				nread = FileRead(thisfile,
+								 file->buffer.data,
+								 header.len,
+								 file->curOffset,
+								 WAIT_EVENT_BUFFILE_READ);
+				if (nread != header.len)
+				{
+					ereport(ERROR,
+							(errcode_for_file_access(),
+							 errmsg("could not read file \"%s\": %m",
+									FilePathName(thisfile))));
+				}
+
+				file->nbytes = nread;
+				file->curOffset += nread;
+			}
+			else
+			{
+				/*
+				 * Read compressed data into the separate buffer, and then
+				 * decompress into the target file buffer.
+				 */
+				nread = FileRead(thisfile,
+								 buff,
+								 header.len,
+								 file->curOffset,
+								 WAIT_EVENT_BUFFILE_READ);
+				if (nread != header.len)
+				{
+					ereport(ERROR,
+							(errcode_for_file_access(),
+							 errmsg("could not read file \"%s\": %m",
+									FilePathName(thisfile))));
+				}
+
+				switch (file->compress)
+				{
+					case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+						file->nbytes = LZ4_decompress_safe(buff,
+														   file->buffer.data, header.len,
+														   sizeof(file->buffer));
+#else
+						elog(ERROR, "LZ4 is not supported by this build");
+#endif
+						break;
+
+					case TEMP_PGLZ_COMPRESSION:
+						file->nbytes = pglz_decompress(buff, header.len,
+													   file->buffer.data, header.raw_len, false);
+						break;
+					case TEMP_NONE_COMPRESSION:
+						/* no compression, nothing to do */
+						break;
+					default:
+						elog(ERROR, "unknown compression method: %d", file->compress);
+						break;
+				}
+				file->curOffset += nread;
+
+				if (file->nbytes < 0)
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg_internal("compressed data is corrupt")));
+
+				/* should have got the expected length */
+				Assert(file->nbytes == header.raw_len);
+			}
+		}
 	}
 
 	if (track_io_timing)
@@ -494,8 +714,101 @@ static void
 BufFileDumpBuffer(BufFile *file)
 {
 	int64		wpos = 0;
-	int64		bytestowrite;
+	int64		bytestowrite = 0;
 	File		thisfile;
+	char	   *DataToWrite = file->buffer.data;
+	int			nbytesOriginal = file->nbytes;
+
+	/*
+	 * Compress the data if requested for this temporary file (and if enabled
+	 * by the temp_file_compression GUC).
+	 *
+	 * The compressed data is written to the one shared compression buffer.
+	 * There's only a single compression operation at any given time, so one
+	 * buffer is enough.
+	 *
+	 * Then we simply point the "DataToWrite" buffer at the compressed buffer.
+	 */
+	if (file->compress != TEMP_NONE_COMPRESSION)
+	{
+		char	   *cData;
+		int			cSize = 0;
+		CompressHeader header;
+
+		Assert(file->cBuffer != NULL);
+		cData = file->cBuffer;
+
+		/* initialize the header for compression */
+		header.len = -1;
+		header.raw_len = nbytesOriginal;
+
+		switch (file->compress)
+		{
+		case TEMP_LZ4_COMPRESSION:
+			{
+#ifdef USE_LZ4
+				int			cBufferSize = LZ4_compressBound(file->nbytes);
+
+				/*
+				 * XXX We might use lz4 stream compression here. Depending
+				 * on the data, that might improve the compression ratio.
+				 * The length is stored at the beginning, we'll fill it in
+				 * at the end.
+				 */
+				cSize = LZ4_compress_default(file->buffer.data,
+											 cData + sizeof(CompressHeader),
+											 file->nbytes, cBufferSize);
+				if (cSize == 0)
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg_internal("compression failed, compressed size %d, original size %d",
+											 cSize, nbytesOriginal)));
+				}
+#else
+				elog(ERROR, "LZ4 is not supported by this build");
+#endif
+				break;
+			}
+			case TEMP_PGLZ_COMPRESSION:
+				cSize = pglz_compress(file->buffer.data, file->nbytes,
+									  cData + sizeof(CompressHeader),
+									  PGLZ_strategy_always);
+
+				/*
+				 * pglz returns -1 for non-compressible data. In that case
+				 * just copy the raw data into the output buffer.
+				 */
+				if (cSize == -1)
+				{
+					memcpy(cData + sizeof(CompressHeader), file->buffer.data,
+						   header.raw_len);
+
+					cSize = header.raw_len;
+					header.raw_len = -1;
+				}
+				break;
+			case TEMP_NONE_COMPRESSION:
+				/* no compression, nothing to do */
+				break;
+			default:
+				elog(ERROR, "unknown compression method: %d", file->compress);
+				break;
+		}
+
+		Assert(cSize != -1);
+		header.len = cSize;
+
+		/*
+		 * Write the header with compressed length at the beginning of the
+		 * buffer. We store both the compressed and raw lengths, and use
+		 * raw_len=-1 when the data was not compressed after all.
+		 */
+		memcpy(cData, &header, sizeof(CompressHeader));
+		file->nbytes = header.len + sizeof(CompressHeader);
+
+		DataToWrite = cData;
+	}
 
 	/*
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
@@ -535,7 +848,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 DataToWrite + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +877,17 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
+	if (!file->compress)
+		file->curOffset -= (file->nbytes - file->pos);
+	else if (nbytesOriginal - file->pos != 0)
+	{
+		/*
+		 * curOffset must be corrected also if compression is enabled, nbytes
+		 * was changed by compression but we have to use the original value of
+		 * nbytes
+		 */
+		file->curOffset -= bytestowrite;
+	}
 	if (file->curOffset < 0)	/* handle possible segment crossing */
 	{
 		file->curFile--;
@@ -602,8 +925,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 	{
 		if (file->pos >= file->nbytes)
 		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
+			/*
+			 * Try to load more data into buffer.
+			 *
+			 * curOffset is moved within BufFileLoadBuffer because stored data
+			 * size differs from loaded/ decompressed size
+			 */
+			if (!file->compress)
+				file->curOffset += file->pos;
 			file->pos = 0;
 			file->nbytes = 0;
 			BufFileLoadBuffer(file);
@@ -742,6 +1071,10 @@ BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
 	int			newFile;
 	pgoff_t		newOffset;
 
+	/* Compressed files only support seek to the beginning */
+	Assert(file->compress == TEMP_NONE_COMPRESSION ||
+		   (whence == SEEK_SET && fileno == 0 && offset == 0));
+
 	switch (whence)
 	{
 		case SEEK_SET:
@@ -831,6 +1164,9 @@ BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
 void
 BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
 {
+	/* Tell doesn't work correctly for compressed files */
+	Assert(file->compress == TEMP_NONE_COMPRESSION);
+
 	*fileno = file->curFile;
 	*offset = file->curOffset + file->pos;
 }
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index f0260e6e412..55bf207637f 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2931,6 +2931,14 @@
   check_hook => 'check_temp_buffers',
 },
 
+{ name => 'temp_file_compression', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT',
+  short_desc => 'Sets the default compression method for temporary files.',
+  long_desc => 'Compression is applied transparently to temporary files used by hash joins. Note that the logged file sizes may appear the same regardless of compression method due to fixed-size chunking; actual disk space savings depend on data compressibility.',
+  variable => 'temp_file_compression',
+  boot_val => 'TEMP_NONE_COMPRESSION',
+  options => 'temp_file_compression_options',
+},
+
 { name => 'temp_file_limit', type => 'int', context => 'PGC_SUSET', group => 'RESOURCES_DISK',
   short_desc => 'Limits the total size of all temporary files used by each process.',
   long_desc => '-1 means no limit.',
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 13c569d8790..945e00920cd 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -78,6 +78,7 @@
 #include "replication/syncrep.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
+#include "storage/buffile.h"
 #include "storage/bufpage.h"
 #include "storage/copydir.h"
 #include "storage/io_worker.h"
@@ -464,6 +465,15 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 	{NULL, 0, false}
 };
 
+static const struct config_enum_entry temp_file_compression_options[] = {
+	{"no", TEMP_NONE_COMPRESSION, false},
+	{"pglz", TEMP_PGLZ_COMPRESSION, false},
+#ifdef  USE_LZ4
+	{"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+	{NULL, 0, false}
+};
+
 static const struct config_enum_entry wal_compression_options[] = {
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
 #ifdef USE_LZ4
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c4f92fcdac8..db23696d243 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -180,8 +180,9 @@
 
 #file_copy_method = copy                # copy, clone (if supported by OS)
 
-#max_notify_queue_pages = 1048576       # limits the number of SLRU pages allocated
-                                        # for NOTIFY / LISTEN queue
+#max_notify_queue_pages = 1048576	# limits the number of SLRU pages allocated
+					# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no'		# enables temporary files compression
 
 # - Kernel Resources -
 
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index e57fa5bca64..76695e4c9e4 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,13 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
 			 */
 			oldcxt = MemoryContextSwitchTo(state->context->parent);
 
-			state->myfile = BufFileCreateTemp(state->interXact);
+			/*
+			 * If requested random access, can't compress the temp file.
+			 */
+			if ((state->eflags & EXEC_FLAG_BACKWARD) != 0)
+				state->myfile = BufFileCreateTemp(state->interXact);
+			else
+				state->myfile = BufFileCreateCompressTemp(state->interXact);
 
 			MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 6754f37836c..2e52ca74bde 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,21 @@
 
 typedef struct BufFile BufFile;
 
+typedef enum
+{
+	TEMP_NONE_COMPRESSION,
+	TEMP_PGLZ_COMPRESSION,
+	TEMP_LZ4_COMPRESSION
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
 /*
  * prototypes for functions in buffile.c
  */
 
 extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateCompressTemp(bool interXact);
 extern void BufFileClose(BufFile *file);
 pg_nodiscard extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
-- 
2.39.5 (Apple Git-154)



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Proposal: Adding compression of temporary files
  In-Reply-To: <CAFjYY+LMTciR=3SLh+8EbAFjumQTrcTKbyU703Srzy3j_yEhSw@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox