public inbox for [email protected]  
help / color / mirror / Atom feed
From: Andres Freund <[email protected]>
To: Melanie Plageman <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Cc: Kirill Reshke <[email protected]>
Cc: Matthias van de Meent <[email protected]>
Cc: [email protected]
Cc: Thomas Munro <[email protected]>
Cc: Noah Misch <[email protected]>
Cc: Robert Haas <[email protected]>
Cc: Michael Paquier <[email protected]>
Subject: Re: Buffer locking is special (hints, checksums, AIO writes)
Date: Wed, 14 Jan 2026 16:20:58 -0500
Message-ID: <k5j77f3q6ztihnjnx2nqxzyor6fbj2qxcbhzuxhkh2yy63jyfg@p72phigar3n4> (raw)
In-Reply-To: <jtg5cu4n6h5lib3kzx66ju4yhh6kmviaud7oq6dtut6c4q4rdi@xwsfoagt3c2b>
References: <[email protected]>
	<q3ybyaiownp4ivt6xfy55s5llvtq6vqsd4zbt5kn6hacxdcpp4@2ngd4h4bev5d>
	<dsbacri4xldlobl3jbqqlksefsyrwj7lzljbohxsbqjus3z4o7@otb6rge4w7az>
	<[email protected]>
	<5dwlfu2jyzkyf3nrlzxxblxctb6xio5es73ptgsahjnmfu5miu@772rc764hfhi>
	<ossv2eistssmubfsir6xjll76tynvxv5lup4zkrfzjkud7fycw@rf5vii6l6cha>
	<4csodkvvfbfloxxjlkgsnl2lgfv2mtzdl7phqzd4jxjadxm4o5@usw7feyb5bzf>
	<CALdSSPgyc7VuMLUZ8J7v2G-PebBa_vEi+mp-cLkcOwNycB56Hw@mail.gmail.com>
	<kjdgvws34gpnz4y6xu5aaeul5mspgy3ahvyiw4lndb3ecsacdb@b2ccyowotpqh>
	<jtg5cu4n6h5lib3kzx66ju4yhh6kmviaud7oq6dtut6c4q4rdi@xwsfoagt3c2b>

Hi,

On 2026-01-12 19:33:56 -0500, Andres Freund wrote:
> Here are the remaining commits, with a bit of polish:

I pushed 0001, 0002.

Attached is an updated version of the remaining changes:

- I updated the definition in BUF_DEFINE_FLAG to have a redundant copy of
  BUF_FLAG_SHIFT's "contents", as suggested by Chao

- I realized I had forgotten to remove the BufferContent lwlock tranche

- Updated the FIXME comment about using PGPROC->lw* to not be a fixme anymore,
  it seems nobody is pushing back against that being ugly-but-reasonable for now

- I renamed ResOwnerReleaseBufferPin etc to ResOwnerReleaseBuffer, as I
  suggested nearby.

- Added a commit removing ForEachLWLockHeldByMe, now that it's not used
  anymore. I checked in with Noah, who added it, and he's on-board with that
  plan.

- Added a commit removing LWLockDisown(), LWLockReleaseDisowned(). They were
  added for AIO and AIO doesn't need them anymore, as that's implemented
  purely in bufmgr.c now. I don't see a reason to keep them...

- I reflowed the comments / README in "Require share-exclusive lock to set hint bits and to flush"
  and removed the FIXME about that

- Removed "FIXME: The start of the comment above needs updating." from the
  above commit, I already had rewritten the comment, just hadn't removed the
  FIXME yet


I tried putting the new code in a header, as we had discussed, but that turns
out to not work easily: The locking code needs access to the private-refcount
infrastructure and we can't put the private refcount infrastructure into a
header without making PrivateRef* non-static, which in turn causes slightly
worse code generation.


I'm now working on cleaning up the last two commits. The most crucial bit is
to simplify what happens in MarkSharedBufferDirtyHint(), we afaict can delete
the use of DELAY_CHKPT_START etc and just go to marking the buffer dirty first
and then do the WAL logging, just like normal WAL logging. The previous order
was only required because we were dirtying the page while holding only a
shared lock, which did not conflict with the lock held by SyncBuffers() etc.

There are some comments that arguably should be updated in 0005, but will only
be updated in 0006. I don't really see how to address that without squashing
the two commits though - which I think wouldn't be good, as the necessary
changes are decidedly nontrivial.

Greetings,

Andres Freund


Attachments:

  [text/x-diff] v11-0001-bufmgr-Change-BufferDesc.state-to-be-a-64-bit-at.patch (45.5K, 2-v11-0001-bufmgr-Change-BufferDesc.state-to-be-a-64-bit-at.patch)
  download | inline diff:
From 65c9d72a531b26f7461392557d354f385b0c404d Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v11 1/7] bufmgr: Change BufferDesc.state to be a 64-bit atomic

This is motivated by wanting to merge buffer content locks into
BufferDesc.state in a future commit, rather than having a separate lwlock (see
commit c75ebc657ff for more details). As this change is rather mechanical, it
seems to make sense to split it out into a separate commit, for easier review.

Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
---
 src/include/storage/buf_internals.h           |  51 +++---
 src/include/storage/procnumber.h              |  14 +-
 src/backend/storage/buffer/buf_init.c         |   2 +-
 src/backend/storage/buffer/bufmgr.c           | 170 +++++++++---------
 src/backend/storage/buffer/freelist.c         |  24 +--
 src/backend/storage/buffer/localbuf.c         |  72 ++++----
 contrib/pg_buffercache/pg_buffercache_pages.c |   8 +-
 contrib/pg_prewarm/autoprewarm.c              |   2 +-
 src/test/modules/test_aio/test_aio.c          |  12 +-
 9 files changed, 179 insertions(+), 176 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 2f607ea2ac5..e6e788224f5 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -30,7 +30,7 @@
 #include "utils/resowner.h"
 
 /*
- * Buffer state is a single 32-bit variable where following data is combined.
+ * Buffer state is a single 64-bit variable where following data is combined.
  *
  * State of the buffer itself (in order):
  * - 18 bits refcount
@@ -40,6 +40,9 @@
  * Combining these values allows to perform some operations without locking
  * the buffer header, by modifying them together with a CAS loop.
  *
+ * NB: A future commit will use a significant portion of the remaining bits to
+ * implement buffer locking as part of the state variable.
+ *
  * The definition of buffer state components is below.
  */
 #define BUF_REFCOUNT_BITS 18
@@ -52,27 +55,27 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
 /* refcount related definitions */
 #define BUF_REFCOUNT_ONE 1
 #define BUF_REFCOUNT_MASK \
-	((1U << BUF_REFCOUNT_BITS) - 1)
+	((UINT64CONST(1) << BUF_REFCOUNT_BITS) - 1)
 
 /* usage count related definitions */
 #define BUF_USAGECOUNT_SHIFT \
 	BUF_REFCOUNT_BITS
 #define BUF_USAGECOUNT_MASK \
-	(((1U << BUF_USAGECOUNT_BITS) - 1) << (BUF_USAGECOUNT_SHIFT))
+	(((UINT64CONST(1) << BUF_USAGECOUNT_BITS) - 1) << (BUF_USAGECOUNT_SHIFT))
 #define BUF_USAGECOUNT_ONE \
-	(1U << BUF_REFCOUNT_BITS)
+	(UINT64CONST(1) << BUF_REFCOUNT_BITS)
 
 /* flags related definitions */
 #define BUF_FLAG_SHIFT \
 	(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS)
 #define BUF_FLAG_MASK \
-	(((1U << BUF_FLAG_BITS) - 1) << BUF_FLAG_SHIFT)
+	(((UINT64CONST(1) << BUF_FLAG_BITS) - 1) << BUF_FLAG_SHIFT)
 
 /* Get refcount and usagecount from buffer state */
 #define BUF_STATE_GET_REFCOUNT(state) \
-	((state) & BUF_REFCOUNT_MASK)
+	((uint32)((state) & BUF_REFCOUNT_MASK))
 #define BUF_STATE_GET_USAGECOUNT(state) \
-	(((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT)
+	((uint32)(((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT))
 
 /*
  * Flags for buffer descriptors
@@ -82,7 +85,7 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
  */
 
 #define BUF_DEFINE_FLAG(flagno)	\
-	(1U << (BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + (flagno)))
+	(UINT64CONST(1) << (BUF_FLAG_SHIFT + (flagno)))
 
 /* buffer header is locked */
 #define BM_LOCKED					BUF_DEFINE_FLAG( 0)
@@ -115,7 +118,7 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
  */
 #define BM_MAX_USAGE_COUNT	5
 
-StaticAssertDecl(BM_MAX_USAGE_COUNT < (1 << BUF_USAGECOUNT_BITS),
+StaticAssertDecl(BM_MAX_USAGE_COUNT < (UINT64CONST(1) << BUF_USAGECOUNT_BITS),
 				 "BM_MAX_USAGE_COUNT doesn't fit in BUF_USAGECOUNT_BITS bits");
 StaticAssertDecl(MAX_BACKENDS_BITS <= BUF_REFCOUNT_BITS,
 				 "MAX_BACKENDS_BITS needs to be <= BUF_REFCOUNT_BITS");
@@ -280,8 +283,8 @@ BufMappingPartitionLockByIndex(uint32 index)
  * We use this same struct for local buffer headers, but the locks are not
  * used and not all of the flag bits are useful either. To avoid unnecessary
  * overhead, manipulations of the state field should be done without actual
- * atomic operations (i.e. only pg_atomic_read_u32() and
- * pg_atomic_unlocked_write_u32()).
+ * atomic operations (i.e. only pg_atomic_read_u64() and
+ * pg_atomic_unlocked_write_u64()).
  *
  * Be careful to avoid increasing the size of the struct when adding or
  * reordering members.  Keeping it below 64 bytes (the most common CPU
@@ -309,7 +312,7 @@ typedef struct BufferDesc
 	 * State of the buffer, containing flags, refcount and usagecount. See
 	 * BUF_* and BM_* defines at the top of this file.
 	 */
-	pg_atomic_uint32 state;
+	pg_atomic_uint64 state;
 
 	/*
 	 * Backend of pin-count waiter. The buffer header spinlock needs to be
@@ -415,7 +418,7 @@ BufferDescriptorGetContentLock(const BufferDesc *bdesc)
  * Functions for acquiring/releasing a shared buffer header's spinlock.  Do
  * not apply these to local buffers!
  */
-extern uint32 LockBufHdr(BufferDesc *desc);
+extern uint64 LockBufHdr(BufferDesc *desc);
 
 /*
  * Unlock the buffer header.
@@ -426,9 +429,9 @@ extern uint32 LockBufHdr(BufferDesc *desc);
 static inline void
 UnlockBufHdr(BufferDesc *desc)
 {
-	Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED);
+	Assert(pg_atomic_read_u64(&desc->state) & BM_LOCKED);
 
-	pg_atomic_fetch_sub_u32(&desc->state, BM_LOCKED);
+	pg_atomic_fetch_sub_u64(&desc->state, BM_LOCKED);
 }
 
 /*
@@ -439,14 +442,14 @@ UnlockBufHdr(BufferDesc *desc)
  * Note that this approach would not work for usagecount, since we need to cap
  * the usagecount at BM_MAX_USAGE_COUNT.
  */
-static inline uint32
-UnlockBufHdrExt(BufferDesc *desc, uint32 old_buf_state,
-				uint32 set_bits, uint32 unset_bits,
+static inline uint64
+UnlockBufHdrExt(BufferDesc *desc, uint64 old_buf_state,
+				uint64 set_bits, uint64 unset_bits,
 				int refcount_change)
 {
 	for (;;)
 	{
-		uint32		buf_state = old_buf_state;
+		uint64		buf_state = old_buf_state;
 
 		Assert(buf_state & BM_LOCKED);
 
@@ -457,7 +460,7 @@ UnlockBufHdrExt(BufferDesc *desc, uint32 old_buf_state,
 		if (refcount_change != 0)
 			buf_state += BUF_REFCOUNT_ONE * refcount_change;
 
-		if (pg_atomic_compare_exchange_u32(&desc->state, &old_buf_state,
+		if (pg_atomic_compare_exchange_u64(&desc->state, &old_buf_state,
 										   buf_state))
 		{
 			return old_buf_state;
@@ -465,7 +468,7 @@ UnlockBufHdrExt(BufferDesc *desc, uint32 old_buf_state,
 	}
 }
 
-extern uint32 WaitBufHdrUnlocked(BufferDesc *buf);
+extern uint64 WaitBufHdrUnlocked(BufferDesc *buf);
 
 /* in bufmgr.c */
 
@@ -525,14 +528,14 @@ extern void TrackNewBufferPin(Buffer buf);
 
 /* solely to make it easier to write tests */
 extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
-extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
+extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 							  bool forget_owner, bool release_aio);
 
 
 /* freelist.c */
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
 extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
-									 uint32 *buf_state, bool *from_ring);
+									 uint64 *buf_state, bool *from_ring);
 extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
 								 BufferDesc *buf, bool from_ring);
 
@@ -568,7 +571,7 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 										  uint32 *extended_by);
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
-								   uint32 set_flag_bits, bool release_aio);
+								   uint64 set_flag_bits, bool release_aio);
 extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
diff --git a/src/include/storage/procnumber.h b/src/include/storage/procnumber.h
index 30c360ad350..bd9cb3891cc 100644
--- a/src/include/storage/procnumber.h
+++ b/src/include/storage/procnumber.h
@@ -27,13 +27,13 @@ typedef int ProcNumber;
 
 /*
  * Note: MAX_BACKENDS_BITS is 18 as that is the space available for buffer
- * refcounts in buf_internals.h.  This limitation could be lifted by using a
- * 64bit state; but it's unlikely to be worthwhile as 2^18-1 backends exceed
- * currently realistic configurations. Even if that limitation were removed,
- * we still could not a) exceed 2^23-1 because inval.c stores the ProcNumber
- * as a 3-byte signed integer, b) INT_MAX/4 because some places compute
- * 4*MaxBackends without any overflow check.  We check that the configured
- * number of backends does not exceed MAX_BACKENDS in InitializeMaxBackends().
+ * refcounts in buf_internals.h.  This limitation could be lifted, but it's
+ * unlikely to be worthwhile as 2^18-1 backends exceed currently realistic
+ * configurations. Even if that limitation were removed, we still could not a)
+ * exceed 2^23-1 because inval.c stores the ProcNumber as a 3-byte signed
+ * integer, b) INT_MAX/4 because some places compute 4*MaxBackends without any
+ * overflow check.  We check that the configured number of backends does not
+ * exceed MAX_BACKENDS in InitializeMaxBackends().
  */
 #define MAX_BACKENDS_BITS		18
 #define MAX_BACKENDS			((1U << MAX_BACKENDS_BITS)-1)
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 9a312bcc7b3..7d894522526 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -121,7 +121,7 @@ BufferManagerShmemInit(void)
 
 			ClearBufferTag(&buf->tag);
 
-			pg_atomic_init_u32(&buf->state, 0);
+			pg_atomic_init_u64(&buf->state, 0);
 			buf->wait_backend_pgprocno = INVALID_PROC_NUMBER;
 
 			buf->buf_id = i;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a036c2aa275..b0de8e45d4d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -780,7 +780,7 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN
 {
 	BufferDesc *bufHdr;
 	BufferTag	tag;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	Assert(BufferIsValid(recent_buffer));
 
@@ -793,7 +793,7 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN
 		int			b = -recent_buffer - 1;
 
 		bufHdr = GetLocalBufferDescriptor(b);
-		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 		/* Is it still valid and holding the right tag? */
 		if ((buf_state & BM_VALID) && BufferTagsEqual(&tag, &bufHdr->tag))
@@ -1386,8 +1386,8 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
 			else
 				bufHdr = GetBufferDescriptor(buffers[i] - 1);
-			Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
-			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+			Assert(pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID);
+			found = pg_atomic_read_u64(&bufHdr->state) & BM_VALID;
 		}
 		else
 		{
@@ -1613,10 +1613,10 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 			GetBufferDescriptor(buffer - 1);
 
 		Assert(BufferGetBlockNumber(buffer) == operation->blocknum + i);
-		Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_TAG_VALID);
+		Assert(pg_atomic_read_u64(&buf_hdr->state) & BM_TAG_VALID);
 
 		if (i < operation->nblocks_done)
-			Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_VALID);
+			Assert(pg_atomic_read_u64(&buf_hdr->state) & BM_VALID);
 	}
 #endif
 }
@@ -2083,8 +2083,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	int			existing_buf_id;
 	Buffer		victim_buffer;
 	BufferDesc *victim_buf_hdr;
-	uint32		victim_buf_state;
-	uint32		set_bits = 0;
+	uint64		victim_buf_state;
+	uint64		set_bits = 0;
 
 	/* Make sure we will have room to remember the buffer pin */
 	ResourceOwnerEnlarge(CurrentResourceOwner);
@@ -2251,7 +2251,7 @@ InvalidateBuffer(BufferDesc *buf)
 	uint32		oldHash;		/* hash value for oldTag */
 	LWLock	   *oldPartitionLock;	/* buffer partition lock for it */
 	uint32		oldFlags;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	/* Save the original buffer tag before dropping the spinlock */
 	oldTag = buf->tag;
@@ -2342,7 +2342,7 @@ retry:
 static bool
 InvalidateVictimBuffer(BufferDesc *buf_hdr)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 	uint32		hash;
 	LWLock	   *partition_lock;
 	BufferTag	tag;
@@ -2402,10 +2402,10 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
 
 	LWLockRelease(partition_lock);
 
-	buf_state = pg_atomic_read_u32(&buf_hdr->state);
+	buf_state = pg_atomic_read_u64(&buf_hdr->state);
 	Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID)));
 	Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
-	Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0);
+	Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u64(&buf_hdr->state)) > 0);
 
 	return true;
 }
@@ -2415,7 +2415,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
 {
 	BufferDesc *buf_hdr;
 	Buffer		buf;
-	uint32		buf_state;
+	uint64		buf_state;
 	bool		from_ring;
 
 	/*
@@ -2548,7 +2548,7 @@ again:
 
 	/* a final set of sanity checks */
 #ifdef USE_ASSERT_CHECKING
-	buf_state = pg_atomic_read_u32(&buf_hdr->state);
+	buf_state = pg_atomic_read_u64(&buf_hdr->state);
 
 	Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
 	Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
@@ -2839,13 +2839,13 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			 */
 			do
 			{
-				pg_atomic_fetch_and_u32(&existing_hdr->state, ~BM_VALID);
+				pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
 			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
-			uint32		buf_state;
-			uint32		set_bits = 0;
+			uint64		buf_state;
+			uint64		set_bits = 0;
 
 			buf_state = LockBufHdr(victim_buf_hdr);
 
@@ -3021,7 +3021,7 @@ BufferIsDirty(Buffer buffer)
 		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
 	}
 
-	return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY;
+	return pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY;
 }
 
 /*
@@ -3037,8 +3037,8 @@ void
 MarkBufferDirty(Buffer buffer)
 {
 	BufferDesc *bufHdr;
-	uint32		buf_state;
-	uint32		old_buf_state;
+	uint64		buf_state;
+	uint64		old_buf_state;
 
 	if (!BufferIsValid(buffer))
 		elog(ERROR, "bad buffer ID: %d", buffer);
@@ -3058,7 +3058,7 @@ MarkBufferDirty(Buffer buffer)
 	 * NB: We have to wait for the buffer header spinlock to be not held, as
 	 * TerminateBufferIO() relies on the spinlock.
 	 */
-	old_buf_state = pg_atomic_read_u32(&bufHdr->state);
+	old_buf_state = pg_atomic_read_u64(&bufHdr->state);
 	for (;;)
 	{
 		if (old_buf_state & BM_LOCKED)
@@ -3069,7 +3069,7 @@ MarkBufferDirty(Buffer buffer)
 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
 		buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
 
-		if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
+		if (pg_atomic_compare_exchange_u64(&bufHdr->state, &old_buf_state,
 										   buf_state))
 			break;
 	}
@@ -3173,10 +3173,10 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
 
 	if (ref == NULL)
 	{
-		uint32		buf_state;
-		uint32		old_buf_state;
+		uint64		buf_state;
+		uint64		old_buf_state;
 
-		old_buf_state = pg_atomic_read_u32(&buf->state);
+		old_buf_state = pg_atomic_read_u64(&buf->state);
 		for (;;)
 		{
 			if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID)))
@@ -3210,7 +3210,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
 					buf_state += BUF_USAGECOUNT_ONE;
 			}
 
-			if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+			if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state,
 											   buf_state))
 			{
 				result = (buf_state & BM_VALID) != 0;
@@ -3237,7 +3237,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
+		result = (pg_atomic_read_u64(&buf->state) & BM_VALID) != 0;
 
 		Assert(ref->data.refcount > 0);
 		ref->data.refcount++;
@@ -3272,7 +3272,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
 static void
 PinBuffer_Locked(BufferDesc *buf)
 {
-	uint32		old_buf_state;
+	uint64		old_buf_state;
 
 	/*
 	 * As explained, We don't expect any preexisting pins. That allows us to
@@ -3284,7 +3284,7 @@ PinBuffer_Locked(BufferDesc *buf)
 	 * Since we hold the buffer spinlock, we can update the buffer state and
 	 * release the lock in one operation.
 	 */
-	old_buf_state = pg_atomic_read_u32(&buf->state);
+	old_buf_state = pg_atomic_read_u64(&buf->state);
 
 	UnlockBufHdrExt(buf, old_buf_state,
 					0, 0, 1);
@@ -3314,7 +3314,7 @@ WakePinCountWaiter(BufferDesc *buf)
 	 * BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this
 	 * backend waking it up.
 	 */
-	uint32		buf_state = LockBufHdr(buf);
+	uint64		buf_state = LockBufHdr(buf);
 
 	if ((buf_state & BM_PIN_COUNT_WAITER) &&
 		BUF_STATE_GET_REFCOUNT(buf_state) == 1)
@@ -3361,7 +3361,7 @@ UnpinBufferNoOwner(BufferDesc *buf)
 	ref->data.refcount--;
 	if (ref->data.refcount == 0)
 	{
-		uint32		old_buf_state;
+		uint64		old_buf_state;
 
 		/*
 		 * Mark buffer non-accessible to Valgrind.
@@ -3379,7 +3379,7 @@ UnpinBufferNoOwner(BufferDesc *buf)
 		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
 
 		/* decrement the shared reference count */
-		old_buf_state = pg_atomic_fetch_sub_u32(&buf->state, BUF_REFCOUNT_ONE);
+		old_buf_state = pg_atomic_fetch_sub_u64(&buf->state, BUF_REFCOUNT_ONE);
 
 		/* Support LockBufferForCleanup() */
 		if (old_buf_state & BM_PIN_COUNT_WAITER)
@@ -3436,7 +3436,7 @@ TrackNewBufferPin(Buffer buf)
 static void
 BufferSync(int flags)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 	int			buf_id;
 	int			num_to_scan;
 	int			num_spaces;
@@ -3446,7 +3446,7 @@ BufferSync(int flags)
 	Oid			last_tsid;
 	binaryheap *ts_heap;
 	int			i;
-	uint32		mask = BM_DIRTY;
+	uint64		mask = BM_DIRTY;
 	WritebackContext wb_context;
 
 	/*
@@ -3478,7 +3478,7 @@ BufferSync(int flags)
 	for (buf_id = 0; buf_id < NBuffers; buf_id++)
 	{
 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
-		uint32		set_bits = 0;
+		uint64		set_bits = 0;
 
 		/*
 		 * Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3645,7 +3645,7 @@ BufferSync(int flags)
 		 * write the buffer though we didn't need to.  It doesn't seem worth
 		 * guarding against this, though.
 		 */
-		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
+		if (pg_atomic_read_u64(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
 			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
 			{
@@ -4015,7 +4015,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 {
 	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 	int			result = 0;
-	uint32		buf_state;
+	uint64		buf_state;
 	BufferTag	tag;
 
 	/* Make sure we can handle the pin */
@@ -4264,7 +4264,7 @@ DebugPrintBufferRefcount(Buffer buffer)
 	int32		loccount;
 	char	   *result;
 	ProcNumber	backend;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	Assert(BufferIsValid(buffer));
 	if (BufferIsLocal(buffer))
@@ -4281,9 +4281,9 @@ DebugPrintBufferRefcount(Buffer buffer)
 	}
 
 	/* theoretically we should lock the bufHdr here */
-	buf_state = pg_atomic_read_u32(&buf->state);
+	buf_state = pg_atomic_read_u64(&buf->state);
 
-	result = psprintf("[%03d] (rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
+	result = psprintf("[%03d] (rel=%s, blockNum=%u, flags=0x%" PRIx64 ", refcount=%u %d)",
 					  buffer,
 					  relpathbackend(BufTagGetRelFileLocator(&buf->tag), backend,
 									 BufTagGetForkNum(&buf->tag)).str,
@@ -4383,7 +4383,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	instr_time	io_start;
 	Block		bufBlock;
 	char	   *bufToWrite;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	/*
 	 * Try to start an I/O operation.  If StartBufferIO returns false, then
@@ -4581,7 +4581,7 @@ BufferIsPermanent(Buffer buffer)
 	 * not random garbage.
 	 */
 	bufHdr = GetBufferDescriptor(buffer - 1);
-	return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
+	return (pg_atomic_read_u64(&bufHdr->state) & BM_PERMANENT) != 0;
 }
 
 /*
@@ -5044,11 +5044,11 @@ FlushRelationBuffers(Relation rel)
 	{
 		for (i = 0; i < NLocBuffer; i++)
 		{
-			uint32		buf_state;
+			uint64		buf_state;
 
 			bufHdr = GetLocalBufferDescriptor(i);
 			if (BufTagMatchesRelFileLocator(&bufHdr->tag, &rel->rd_locator) &&
-				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
+				((buf_state = pg_atomic_read_u64(&bufHdr->state)) &
 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 			{
 				ErrorContextCallback errcallback;
@@ -5084,7 +5084,7 @@ FlushRelationBuffers(Relation rel)
 
 	for (i = 0; i < NBuffers; i++)
 	{
-		uint32		buf_state;
+		uint64		buf_state;
 
 		bufHdr = GetBufferDescriptor(i);
 
@@ -5156,7 +5156,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
 	{
 		SMgrSortArray *srelent = NULL;
 		BufferDesc *bufHdr = GetBufferDescriptor(i);
-		uint32		buf_state;
+		uint64		buf_state;
 
 		/*
 		 * As in DropRelationBuffers, an unlocked precheck should be safe and
@@ -5405,7 +5405,7 @@ FlushDatabaseBuffers(Oid dbid)
 
 	for (i = 0; i < NBuffers; i++)
 	{
-		uint32		buf_state;
+		uint64		buf_state;
 
 		bufHdr = GetBufferDescriptor(i);
 
@@ -5553,13 +5553,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	 * is only intended to be used in cases where failing to write out the
 	 * data would be harmless anyway, it doesn't really matter.
 	 */
-	if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+	if ((pg_atomic_read_u64(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
 		(BM_DIRTY | BM_JUST_DIRTIED))
 	{
 		XLogRecPtr	lsn = InvalidXLogRecPtr;
 		bool		dirtied = false;
 		bool		delayChkptFlags = false;
-		uint32		buf_state;
+		uint64		buf_state;
 
 		/*
 		 * If we need to protect hint bit updates from torn writes, WAL-log a
@@ -5571,7 +5571,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		 * when we call XLogInsert() since the value changes dynamically.
 		 */
 		if (XLogHintBitIsNeeded() &&
-			(pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
+			(pg_atomic_read_u64(&bufHdr->state) & BM_PERMANENT))
 		{
 			/*
 			 * If we must not write WAL, due to a relfilelocator-specific
@@ -5671,8 +5671,8 @@ UnlockBuffers(void)
 
 	if (buf)
 	{
-		uint32		buf_state;
-		uint32		unset_bits = 0;
+		uint64		buf_state;
+		uint64		unset_bits = 0;
 
 		buf_state = LockBufHdr(buf);
 
@@ -5803,8 +5803,8 @@ LockBufferForCleanup(Buffer buffer)
 
 	for (;;)
 	{
-		uint32		buf_state;
-		uint32		unset_bits = 0;
+		uint64		buf_state;
+		uint64		unset_bits = 0;
 
 		/* Try to acquire lock */
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -5952,7 +5952,7 @@ bool
 ConditionalLockBufferForCleanup(Buffer buffer)
 {
 	BufferDesc *bufHdr;
-	uint32		buf_state,
+	uint64		buf_state,
 				refcount;
 
 	Assert(BufferIsValid(buffer));
@@ -6010,7 +6010,7 @@ bool
 IsBufferCleanupOK(Buffer buffer)
 {
 	BufferDesc *bufHdr;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	Assert(BufferIsValid(buffer));
 
@@ -6066,7 +6066,7 @@ WaitIO(BufferDesc *buf)
 	ConditionVariablePrepareToSleep(cv);
 	for (;;)
 	{
-		uint32		buf_state;
+		uint64		buf_state;
 		PgAioWaitRef iow;
 
 		/*
@@ -6140,7 +6140,7 @@ WaitIO(BufferDesc *buf)
 bool
 StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 
 	ResourceOwnerEnlarge(CurrentResourceOwner);
 
@@ -6196,11 +6196,11 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
  * is being released)
  */
 void
-TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
+TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 				  bool forget_owner, bool release_aio)
 {
-	uint32		buf_state;
-	uint32		unset_flag_bits = 0;
+	uint64		buf_state;
+	uint64		unset_flag_bits = 0;
 	int			refcount_change = 0;
 
 	buf_state = LockBufHdr(buf);
@@ -6261,7 +6261,7 @@ static void
 AbortBufferIO(Buffer buffer)
 {
 	BufferDesc *buf_hdr = GetBufferDescriptor(buffer - 1);
-	uint32		buf_state;
+	uint64		buf_state;
 
 	buf_state = LockBufHdr(buf_hdr);
 	Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
@@ -6355,10 +6355,10 @@ rlocator_comparator(const void *p1, const void *p2)
 /*
  * Lock buffer header - set BM_LOCKED in buffer state.
  */
-uint32
+uint64
 LockBufHdr(BufferDesc *desc)
 {
-	uint32		old_buf_state;
+	uint64		old_buf_state;
 
 	Assert(!BufferIsLocal(BufferDescriptorGetBuffer(desc)));
 
@@ -6369,7 +6369,7 @@ LockBufHdr(BufferDesc *desc)
 		 * the spin-delay infrastructure. The work necessary for that shows up
 		 * in profiles and is rarely necessary.
 		 */
-		old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
+		old_buf_state = pg_atomic_fetch_or_u64(&desc->state, BM_LOCKED);
 		if (likely(!(old_buf_state & BM_LOCKED)))
 			break;				/* got lock */
 
@@ -6382,7 +6382,7 @@ LockBufHdr(BufferDesc *desc)
 			while (old_buf_state & BM_LOCKED)
 			{
 				perform_spin_delay(&delayStatus);
-				old_buf_state = pg_atomic_read_u32(&desc->state);
+				old_buf_state = pg_atomic_read_u64(&desc->state);
 			}
 			finish_spin_delay(&delayStatus);
 		}
@@ -6403,20 +6403,20 @@ LockBufHdr(BufferDesc *desc)
  * Obviously the buffer could be locked by the time the value is returned, so
  * this is primarily useful in CAS style loops.
  */
-pg_noinline uint32
+pg_noinline uint64
 WaitBufHdrUnlocked(BufferDesc *buf)
 {
 	SpinDelayStatus delayStatus;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	init_local_spin_delay(&delayStatus);
 
-	buf_state = pg_atomic_read_u32(&buf->state);
+	buf_state = pg_atomic_read_u64(&buf->state);
 
 	while (buf_state & BM_LOCKED)
 	{
 		perform_spin_delay(&delayStatus);
-		buf_state = pg_atomic_read_u32(&buf->state);
+		buf_state = pg_atomic_read_u64(&buf->state);
 	}
 
 	finish_spin_delay(&delayStatus);
@@ -6704,12 +6704,12 @@ ResOwnerPrintBufferPin(Datum res)
 static bool
 EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 	bool		result;
 
 	*buffer_flushed = false;
 
-	buf_state = pg_atomic_read_u32(&(desc->state));
+	buf_state = pg_atomic_read_u64(&(desc->state));
 	Assert(buf_state & BM_LOCKED);
 
 	if ((buf_state & BM_VALID) == 0)
@@ -6803,12 +6803,12 @@ EvictAllUnpinnedBuffers(int32 *buffers_evicted, int32 *buffers_flushed,
 	for (int buf = 1; buf <= NBuffers; buf++)
 	{
 		BufferDesc *desc = GetBufferDescriptor(buf - 1);
-		uint32		buf_state;
+		uint64		buf_state;
 		bool		buffer_flushed;
 
 		CHECK_FOR_INTERRUPTS();
 
-		buf_state = pg_atomic_read_u32(&desc->state);
+		buf_state = pg_atomic_read_u64(&desc->state);
 		if (!(buf_state & BM_VALID))
 			continue;
 
@@ -6855,7 +6855,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted,
 	for (int buf = 1; buf <= NBuffers; buf++)
 	{
 		BufferDesc *desc = GetBufferDescriptor(buf - 1);
-		uint32		buf_state = pg_atomic_read_u32(&(desc->state));
+		uint64		buf_state = pg_atomic_read_u64(&(desc->state));
 		bool		buffer_flushed;
 
 		CHECK_FOR_INTERRUPTS();
@@ -6897,12 +6897,12 @@ static bool
 MarkDirtyUnpinnedBufferInternal(Buffer buf, BufferDesc *desc,
 								bool *buffer_already_dirty)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 	bool		result = false;
 
 	*buffer_already_dirty = false;
 
-	buf_state = pg_atomic_read_u32(&(desc->state));
+	buf_state = pg_atomic_read_u64(&(desc->state));
 	Assert(buf_state & BM_LOCKED);
 
 	if ((buf_state & BM_VALID) == 0)
@@ -7000,7 +7000,7 @@ MarkDirtyRelUnpinnedBuffers(Relation rel,
 	for (int buf = 1; buf <= NBuffers; buf++)
 	{
 		BufferDesc *desc = GetBufferDescriptor(buf - 1);
-		uint32		buf_state = pg_atomic_read_u32(&(desc->state));
+		uint64		buf_state = pg_atomic_read_u64(&(desc->state));
 		bool		buffer_already_dirty;
 
 		CHECK_FOR_INTERRUPTS();
@@ -7054,12 +7054,12 @@ MarkDirtyAllUnpinnedBuffers(int32 *buffers_dirtied,
 	for (int buf = 1; buf <= NBuffers; buf++)
 	{
 		BufferDesc *desc = GetBufferDescriptor(buf - 1);
-		uint32		buf_state;
+		uint64		buf_state;
 		bool		buffer_already_dirty;
 
 		CHECK_FOR_INTERRUPTS();
 
-		buf_state = pg_atomic_read_u32(&desc->state);
+		buf_state = pg_atomic_read_u64(&desc->state);
 		if (!(buf_state & BM_VALID))
 			continue;
 
@@ -7110,7 +7110,7 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
 		BufferDesc *buf_hdr = is_temp ?
 			GetLocalBufferDescriptor(-buffer - 1)
 			: GetBufferDescriptor(buffer - 1);
-		uint32		buf_state;
+		uint64		buf_state;
 
 		/*
 		 * Check that all the buffers are actually ones that could conceivably
@@ -7128,7 +7128,7 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
 		}
 
 		if (is_temp)
-			buf_state = pg_atomic_read_u32(&buf_hdr->state);
+			buf_state = pg_atomic_read_u64(&buf_hdr->state);
 		else
 			buf_state = LockBufHdr(buf_hdr);
 
@@ -7166,7 +7166,7 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
 		if (is_temp)
 		{
 			buf_state += BUF_REFCOUNT_ONE;
-			pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+			pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
 		}
 		else
 			UnlockBufHdrExt(buf_hdr, buf_state, 0, 0, 1);
@@ -7352,13 +7352,13 @@ buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
 		: GetBufferDescriptor(buffer - 1);
 	BufferTag	tag = buf_hdr->tag;
 	char	   *bufdata = BufferGetBlock(buffer);
-	uint32		set_flag_bits;
+	uint64		set_flag_bits;
 	int			piv_flags;
 
 	/* check that the buffer is in the expected state for a read */
 #ifdef USE_ASSERT_CHECKING
 	{
-		uint32		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		uint64		buf_state = pg_atomic_read_u64(&buf_hdr->state);
 
 		Assert(buf_state & BM_TAG_VALID);
 		Assert(!(buf_state & BM_VALID));
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 9a93fb335fc..b7687836188 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -86,7 +86,7 @@ typedef struct BufferAccessStrategyData
 
 /* Prototypes for internal functions */
 static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
-									 uint32 *buf_state);
+									 uint64 *buf_state);
 static void AddBufferToRing(BufferAccessStrategy strategy,
 							BufferDesc *buf);
 
@@ -171,7 +171,7 @@ ClockSweepTick(void)
  *	before returning.
  */
 BufferDesc *
-StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring)
+StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_ring)
 {
 	BufferDesc *buf;
 	int			bgwprocno;
@@ -230,8 +230,8 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
 	trycounter = NBuffers;
 	for (;;)
 	{
-		uint32		old_buf_state;
-		uint32		local_buf_state;
+		uint64		old_buf_state;
+		uint64		local_buf_state;
 
 		buf = GetBufferDescriptor(ClockSweepTick());
 
@@ -239,7 +239,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
 		 * Check whether the buffer can be used and pin it if so. Do this
 		 * using a CAS loop, to avoid having to lock the buffer header.
 		 */
-		old_buf_state = pg_atomic_read_u32(&buf->state);
+		old_buf_state = pg_atomic_read_u64(&buf->state);
 		for (;;)
 		{
 			local_buf_state = old_buf_state;
@@ -277,7 +277,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
 			{
 				local_buf_state -= BUF_USAGECOUNT_ONE;
 
-				if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+				if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state,
 												   local_buf_state))
 				{
 					trycounter = NBuffers;
@@ -289,7 +289,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
 				/* pin the buffer if the CAS succeeds */
 				local_buf_state += BUF_REFCOUNT_ONE;
 
-				if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+				if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state,
 												   local_buf_state))
 				{
 					/* Found a usable buffer */
@@ -655,12 +655,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
  * returning.
  */
 static BufferDesc *
-GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
+GetBufferFromRing(BufferAccessStrategy strategy, uint64 *buf_state)
 {
 	BufferDesc *buf;
 	Buffer		bufnum;
-	uint32		old_buf_state;
-	uint32		local_buf_state;	/* to avoid repeated (de-)referencing */
+	uint64		old_buf_state;
+	uint64		local_buf_state;	/* to avoid repeated (de-)referencing */
 
 
 	/* Advance to next ring slot */
@@ -682,7 +682,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
 	 * Check whether the buffer can be used and pin it if so. Do this using a
 	 * CAS loop, to avoid having to lock the buffer header.
 	 */
-	old_buf_state = pg_atomic_read_u32(&buf->state);
+	old_buf_state = pg_atomic_read_u64(&buf->state);
 	for (;;)
 	{
 		local_buf_state = old_buf_state;
@@ -710,7 +710,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
 		/* pin the buffer if the CAS succeeds */
 		local_buf_state += BUF_REFCOUNT_ONE;
 
-		if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+		if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state,
 										   local_buf_state))
 		{
 			*buf_state = local_buf_state;
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index f6e2b1aa288..04a540379a2 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -148,7 +148,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	}
 	else
 	{
-		uint32		buf_state;
+		uint64		buf_state;
 
 		victim_buffer = GetLocalVictimBuffer();
 		bufid = -victim_buffer - 1;
@@ -165,10 +165,10 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 		 */
 		bufHdr->tag = newTag;
 
-		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state = pg_atomic_read_u64(&bufHdr->state);
 		buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
 		buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+		pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
 
 		*foundPtr = false;
 	}
@@ -245,12 +245,12 @@ GetLocalVictimBuffer(void)
 
 		if (LocalRefCount[victim_bufid] == 0)
 		{
-			uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+			uint64		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 			if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
 			{
 				buf_state -= BUF_USAGECOUNT_ONE;
-				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+				pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
 				trycounter = NLocBuffer;
 			}
 			else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
@@ -286,13 +286,13 @@ GetLocalVictimBuffer(void)
 	 * this buffer is not referenced but it might still be dirty. if that's
 	 * the case, write it out before reusing it!
 	 */
-	if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
+	if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
 		FlushLocalBuffer(bufHdr, NULL);
 
 	/*
 	 * Remove the victim buffer from the hashtable and mark as invalid.
 	 */
-	if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
+	if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
 	{
 		InvalidateLocalBuffer(bufHdr, false);
 
@@ -417,7 +417,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 		if (found)
 		{
 			BufferDesc *existing_hdr;
-			uint32		buf_state;
+			uint64		buf_state;
 
 			UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
 
@@ -428,18 +428,18 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			/*
 			 * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
 			 */
-			buf_state = pg_atomic_read_u32(&existing_hdr->state);
+			buf_state = pg_atomic_read_u64(&existing_hdr->state);
 			Assert(buf_state & BM_TAG_VALID);
 			Assert(!(buf_state & BM_DIRTY));
 			buf_state &= ~BM_VALID;
-			pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
+			pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
 
 			/* no need to loop for local buffers */
 			StartLocalBufferIO(existing_hdr, true, false);
 		}
 		else
 		{
-			uint32		buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
+			uint64		buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
 
 			Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
 
@@ -447,7 +447,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
 			buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
 
-			pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
+			pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
 
 			hresult->id = victim_buf_id;
 
@@ -467,13 +467,13 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 	{
 		Buffer		buf = buffers[i];
 		BufferDesc *buf_hdr;
-		uint32		buf_state;
+		uint64		buf_state;
 
 		buf_hdr = GetLocalBufferDescriptor(-buf - 1);
 
-		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		buf_state = pg_atomic_read_u64(&buf_hdr->state);
 		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+		pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
 	}
 
 	*extended_by = extend_by;
@@ -492,7 +492,7 @@ MarkLocalBufferDirty(Buffer buffer)
 {
 	int			bufid;
 	BufferDesc *bufHdr;
-	uint32		buf_state;
+	uint64		buf_state;
 
 	Assert(BufferIsLocal(buffer));
 
@@ -506,14 +506,14 @@ MarkLocalBufferDirty(Buffer buffer)
 
 	bufHdr = GetLocalBufferDescriptor(bufid);
 
-	buf_state = pg_atomic_read_u32(&bufHdr->state);
+	buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 	if (!(buf_state & BM_DIRTY))
 		pgBufferUsage.local_blks_dirtied++;
 
 	buf_state |= BM_DIRTY;
 
-	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
 }
 
 /*
@@ -522,7 +522,7 @@ MarkLocalBufferDirty(Buffer buffer)
 bool
 StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 
 	/*
 	 * With AIO the buffer could have IO in progress, e.g. when there are two
@@ -542,7 +542,7 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 	/* Once we get here, there is definitely no I/O active on this buffer */
 
 	/* Check if someone else already did the I/O */
-	buf_state = pg_atomic_read_u32(&bufHdr->state);
+	buf_state = pg_atomic_read_u64(&bufHdr->state);
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
 		return false;
@@ -559,11 +559,11 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
  * Like TerminateBufferIO, but for local buffers
  */
 void
-TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
+TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
 					   bool release_aio)
 {
 	/* Only need to adjust flags */
-	uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+	uint64		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 	/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
 
@@ -582,7 +582,7 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit
 	}
 
 	buf_state |= set_flag_bits;
-	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
 
 	/* local buffers don't track IO using resowners */
 
@@ -606,7 +606,7 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
 {
 	Buffer		buffer = BufferDescriptorGetBuffer(bufHdr);
 	int			bufid = -buffer - 1;
-	uint32		buf_state;
+	uint64		buf_state;
 	LocalBufferLookupEnt *hresult;
 
 	/*
@@ -622,7 +622,7 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
 		Assert(!pgaio_wref_valid(&bufHdr->io_wref));
 	}
 
-	buf_state = pg_atomic_read_u32(&bufHdr->state);
+	buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 	/*
 	 * We need to test not just LocalRefCount[bufid] but also the BufferDesc
@@ -647,7 +647,7 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
 	ClearBufferTag(&bufHdr->tag);
 	buf_state &= ~BUF_FLAG_MASK;
 	buf_state &= ~BUF_USAGECOUNT_MASK;
-	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
 }
 
 /*
@@ -671,9 +671,9 @@ DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
 	for (i = 0; i < NLocBuffer; i++)
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
-		uint32		buf_state;
+		uint64		buf_state;
 
-		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 		if (!(buf_state & BM_TAG_VALID) ||
 			!BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
@@ -706,9 +706,9 @@ DropRelationAllLocalBuffers(RelFileLocator rlocator)
 	for (i = 0; i < NLocBuffer; i++)
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
-		uint32		buf_state;
+		uint64		buf_state;
 
-		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 		if ((buf_state & BM_TAG_VALID) &&
 			BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
@@ -804,11 +804,11 @@ InitLocalBuffers(void)
 bool
 PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
 {
-	uint32		buf_state;
+	uint64		buf_state;
 	Buffer		buffer = BufferDescriptorGetBuffer(buf_hdr);
 	int			bufid = -buffer - 1;
 
-	buf_state = pg_atomic_read_u32(&buf_hdr->state);
+	buf_state = pg_atomic_read_u64(&buf_hdr->state);
 
 	if (LocalRefCount[bufid] == 0)
 	{
@@ -819,7 +819,7 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
 		{
 			buf_state += BUF_USAGECOUNT_ONE;
 		}
-		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+		pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
 
 		/*
 		 * See comment in PinBuffer().
@@ -856,14 +856,14 @@ UnpinLocalBufferNoOwner(Buffer buffer)
 	if (--LocalRefCount[buffid] == 0)
 	{
 		BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
-		uint32		buf_state;
+		uint64		buf_state;
 
 		NLocalPinnedBuffers--;
 
-		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		buf_state = pg_atomic_read_u64(&buf_hdr->state);
 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
 		buf_state -= BUF_REFCOUNT_ONE;
-		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+		pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
 
 		/* see comment in UnpinBufferNoOwner */
 		VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c
index b682dca658b..dcba3fb5473 100644
--- a/contrib/pg_buffercache/pg_buffercache_pages.c
+++ b/contrib/pg_buffercache/pg_buffercache_pages.c
@@ -199,7 +199,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
 		for (i = 0; i < NBuffers; i++)
 		{
 			BufferDesc *bufHdr;
-			uint32		buf_state;
+			uint64		buf_state;
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -615,7 +615,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS)
 	for (int i = 0; i < NBuffers; i++)
 	{
 		BufferDesc *bufHdr;
-		uint32		buf_state;
+		uint64		buf_state;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -626,7 +626,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS)
 		 * noticeably increase the cost of the function.
 		 */
 		bufHdr = GetBufferDescriptor(i);
-		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state = pg_atomic_read_u64(&bufHdr->state);
 
 		if (buf_state & BM_VALID)
 		{
@@ -676,7 +676,7 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS)
 	for (int i = 0; i < NBuffers; i++)
 	{
 		BufferDesc *bufHdr = GetBufferDescriptor(i);
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		uint64		buf_state = pg_atomic_read_u64(&bufHdr->state);
 		int			usage_count;
 
 		CHECK_FOR_INTERRUPTS();
diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 3ca7d2ed772..89e187425cc 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -703,7 +703,7 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged)
 
 	for (num_blocks = 0, i = 0; i < NBuffers; i++)
 	{
-		uint32		buf_state;
+		uint64		buf_state;
 
 		CHECK_FOR_INTERRUPTS();
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index e046b08f3d5..b1aa8af9ec0 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -308,9 +308,9 @@ create_toy_buffer(Relation rel, BlockNumber blkno)
 {
 	Buffer		buf;
 	BufferDesc *buf_hdr;
-	uint32		buf_state;
+	uint64		buf_state;
 	bool		was_pinned = false;
-	uint32		unset_bits = 0;
+	uint64		unset_bits = 0;
 
 	/* place buffer in shared buffers without erroring out */
 	buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
@@ -319,7 +319,7 @@ create_toy_buffer(Relation rel, BlockNumber blkno)
 	if (RelationUsesLocalBuffers(rel))
 	{
 		buf_hdr = GetLocalBufferDescriptor(-buf - 1);
-		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		buf_state = pg_atomic_read_u64(&buf_hdr->state);
 	}
 	else
 	{
@@ -340,7 +340,7 @@ create_toy_buffer(Relation rel, BlockNumber blkno)
 	if (RelationUsesLocalBuffers(rel))
 	{
 		buf_state &= ~unset_bits;
-		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+		pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
 	}
 	else
 	{
@@ -489,7 +489,7 @@ invalidate_rel_block(PG_FUNCTION_ARGS)
 
 			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
-			if (pg_atomic_read_u32(&buf_hdr->state) & BM_DIRTY)
+			if (pg_atomic_read_u64(&buf_hdr->state) & BM_DIRTY)
 			{
 				if (BufferIsLocal(buf))
 					FlushLocalBuffer(buf_hdr, NULL);
@@ -572,7 +572,7 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	bool		io_error = PG_GETARG_BOOL(3);
 	bool		release_aio = PG_GETARG_BOOL(4);
 	bool		clear_dirty = false;
-	uint32		set_flag_bits = 0;
+	uint64		set_flag_bits = 0;
 
 	if (io_error)
 		set_flag_bits |= BM_IO_ERROR;
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0002-bufmgr-Implement-buffer-content-locks-independen.patch (51.1K, 3-v11-0002-bufmgr-Implement-buffer-content-locks-independen.patch)
  download | inline diff:
From 2702c1de6afbc03eed4254482b82dfbb0299c821 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v11 2/7] bufmgr: Implement buffer content locks independently
 of lwlocks

Until now buffer content locks were implemented using lwlocks. That has the
obvious advantage of not needing a separate efficient implementation of
locks. However, the time for a dedicated buffer content lock implementation
has come:

1) Hint bits are currently set while holding only a share lock. This leads to
   having to copy pages while they are being written out if checksums are
   enabled, which is not cheap. We would like to add AIO writes, however once
   many buffers can be written out at the same time, it gets a lot more
   expensive to copy them, particularly because that copy needs to reside in
   shared buffers (for worker mode to have access to the buffer).

   In addition, modifying buffers while they are being written out can cause
   issues with unbuffered/direct-IO, as some filesystems (like btrfs) do not
   like that, due to filesystem internal checksums getting corrupted.

   The solution to this is to require a new share-exclusive lock-level to set
   hint bits and to write out buffers, making those operations mutually
   exclusive. We could introduce such a lock-level into the generic lwlock
   implementation, however it does not look like there would be other users,
   and it does add some overhead into important code paths.

2) For AIO writes we need to be able to race-freely check whether a buffer is
   undergoing IO and whether an exclusive lock on the page can be acquired. That
   is rather hard to do efficiently when the buffer state and the lock state
   are separate atomic variables. This is a major hindrance to allowing writes
   to be done asynchronously.

3) Buffer locks are by far the most frequently taken locks. Optimizing them
   specifically for their use case is worth the effort. E.g. by merging
   content locks into buffer locks we will be able to release a buffer lock
   and pin in one atomic operation.

4) There are more complicated optimizations, like long-lived "super pinned &
   locked" pages, that cannot realistically be implemented with the generic
   lwlock implementation.

Therefore implement content locks inside bufmgr.c. The lockstate is stored as
part of BufferDesc.state. The implementation of buffer content locks is fairly
similar to lwlocks, with a few important differences:

1) An additional lock-level share-exclusive has been added. This lock-level
   conflicts with exclusive locks and itself, but not share locks.

2) Error recovery for content locks is implemented as part of the already
   existing private-refcount tracking mechanism in combination with resowners,
   instead of a bespoke mechanism as the case for lwlocks. This means we do
   not need to add dedicated error-recovery code paths to release all content
   locks (like done with LWLockReleaseAll() for lwlocks).

3) The lock state is embedded in BufferDesc.state instead of having its own
   struct.

4) The wakeup logic is a tad more complicated due to needing to support the
   additional lock-level

This commit unfortunately introduces some code that is very similar to the
code in lwlock.c, however the code is not equivalent enough to easily merge
it. The future wins that this commit makes possible seem worth the cost.

As of this commit nothing uses the new share-exclusive lock mode. It will be
used in a future commit. It seemed too complicated to introduce the lock-level
in a separate commit.

It's worth calling out one wart in this commit: Despite content locks not
being lwlocks anymore, they continue to use PGPROC->lw* - that seemed better
than duplicating the relevant infrastructure.

Another thing worth pointing out is that, after this change, content locks are
not reported as LWLock wait events anymore, but as new wait events in the
"Buffer" wait event class (see also 6c5c393b740). The old BufferContent lwlock
tranche has been removed.

Reviewed-by: Melanie Plageman <[email protected]>
Reviewed-by: Heikki Linnakangas <[email protected]>
Reviewed-by: Greg Burd <[email protected]>
Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
---
 src/include/storage/buf_internals.h           |  73 +-
 src/include/storage/bufmgr.h                  |  32 +-
 src/include/storage/lwlocklist.h              |   1 -
 src/include/storage/proc.h                    |   8 +-
 src/backend/storage/buffer/buf_init.c         |   5 +-
 src/backend/storage/buffer/bufmgr.c           | 916 ++++++++++++++++--
 .../utils/activity/wait_event_names.txt       |   4 +-
 7 files changed, 934 insertions(+), 105 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index e6e788224f5..27f12502d19 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -23,6 +23,7 @@
 #include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/procnumber.h"
+#include "storage/proclist_types.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
@@ -35,22 +36,23 @@
  * State of the buffer itself (in order):
  * - 18 bits refcount
  * - 4 bits usage count
- * - 10 bits of flags
+ * - 12 bits of flags
+ * - 18 bits share-lock count
+ * - 1 bit share-exclusive locked
+ * - 1 bit exclusive locked
  *
  * Combining these values allows to perform some operations without locking
  * the buffer header, by modifying them together with a CAS loop.
  *
- * NB: A future commit will use a significant portion of the remaining bits to
- * implement buffer locking as part of the state variable.
- *
  * The definition of buffer state components is below.
  */
 #define BUF_REFCOUNT_BITS 18
 #define BUF_USAGECOUNT_BITS 4
-#define BUF_FLAG_BITS 10
+#define BUF_FLAG_BITS 12
+#define BUF_LOCK_BITS (18+2)
 
-StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
-				 "parts of buffer state space need to equal 32");
+StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS + BUF_LOCK_BITS <= 64,
+				 "parts of buffer state space need to be <= 64");
 
 /* refcount related definitions */
 #define BUF_REFCOUNT_ONE 1
@@ -71,6 +73,19 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
 #define BUF_FLAG_MASK \
 	(((UINT64CONST(1) << BUF_FLAG_BITS) - 1) << BUF_FLAG_SHIFT)
 
+/* lock state related definitions */
+#define BM_LOCK_SHIFT \
+	(BUF_FLAG_SHIFT + BUF_FLAG_BITS)
+#define BM_LOCK_VAL_SHARED \
+	(UINT64CONST(1) << (BM_LOCK_SHIFT))
+#define BM_LOCK_VAL_SHARE_EXCLUSIVE \
+	(UINT64CONST(1) << (BM_LOCK_SHIFT + MAX_BACKENDS_BITS))
+#define BM_LOCK_VAL_EXCLUSIVE \
+	(UINT64CONST(1) << (BM_LOCK_SHIFT + MAX_BACKENDS_BITS + 1))
+#define BM_LOCK_MASK \
+	((((uint64) MAX_BACKENDS) << BM_LOCK_SHIFT) | BM_LOCK_VAL_SHARE_EXCLUSIVE | BM_LOCK_VAL_EXCLUSIVE)
+
+
 /* Get refcount and usagecount from buffer state */
 #define BUF_STATE_GET_REFCOUNT(state) \
 	((uint32)((state) & BUF_REFCOUNT_MASK))
@@ -107,6 +122,17 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
 #define BM_CHECKPOINT_NEEDED		BUF_DEFINE_FLAG( 8)
 /* permanent buffer (not unlogged, or init fork) */
 #define BM_PERMANENT				BUF_DEFINE_FLAG( 9)
+/* content lock has waiters */
+#define BM_LOCK_HAS_WAITERS			BUF_DEFINE_FLAG(10)
+/* waiter for content lock has been signalled but not yet run */
+#define BM_LOCK_WAKE_IN_PROGRESS	BUF_DEFINE_FLAG(11)
+
+
+StaticAssertDecl(MAX_BACKENDS_BITS <= BUF_REFCOUNT_BITS,
+				 "MAX_BACKENDS_BITS needs to be <= BUF_REFCOUNT_BITS");
+StaticAssertDecl(MAX_BACKENDS_BITS <= (BUF_LOCK_BITS - 2),
+				 "MAX_BACKENDS_BITS needs to be <= BUF_LOCK_BITS - 2");
+
 
 /*
  * The maximum allowed value of usage_count represents a tradeoff between
@@ -120,8 +146,6 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
 
 StaticAssertDecl(BM_MAX_USAGE_COUNT < (UINT64CONST(1) << BUF_USAGECOUNT_BITS),
 				 "BM_MAX_USAGE_COUNT doesn't fit in BUF_USAGECOUNT_BITS bits");
-StaticAssertDecl(MAX_BACKENDS_BITS <= BUF_REFCOUNT_BITS,
-				 "MAX_BACKENDS_BITS needs to be <= BUF_REFCOUNT_BITS");
 
 /*
  * Buffer tag identifies which disk block the buffer contains.
@@ -265,9 +289,6 @@ BufMappingPartitionLockByIndex(uint32 index)
  * it is held.  However, existing buffer pins may be released while the buffer
  * header spinlock is held, using an atomic subtraction.
  *
- * The LWLock can take care of itself.  The buffer header lock is *not* used
- * to control access to the data in the buffer!
- *
  * If we have the buffer pinned, its tag can't change underneath us, so we can
  * examine the tag without locking the buffer header.  Also, in places we do
  * one-time reads of the flags without bothering to lock the buffer header;
@@ -280,6 +301,15 @@ BufMappingPartitionLockByIndex(uint32 index)
  * wait_backend_pgprocno and setting flag bit BM_PIN_COUNT_WAITER.  At present,
  * there can be only one such waiter per buffer.
  *
+ * The content of buffers is protected via the buffer content lock,
+ * implemented as part of the buffer state. Note that the buffer header lock
+ * is *not* used to control access to the data in the buffer! We used to use
+ * an LWLock to implement the content lock, but having a dedicated
+ * implementation of content locks allows us to implement some otherwise hard
+ * things (e.g. race-freely checking if AIO is in progress before locking a
+ * buffer exclusively) and enables otherwise impossible optimizations
+ * (e.g. unlocking and unpinning a buffer in one atomic operation).
+ *
  * We use this same struct for local buffer headers, but the locks are not
  * used and not all of the flag bits are useful either. To avoid unnecessary
  * overhead, manipulations of the state field should be done without actual
@@ -321,7 +351,12 @@ typedef struct BufferDesc
 	int			wait_backend_pgprocno;
 
 	PgAioWaitRef io_wref;		/* set iff AIO is in progress */
-	LWLock		content_lock;	/* to lock access to buffer contents */
+
+	/*
+	 * List of PGPROCs waiting for the buffer content lock. Protected by the
+	 * buffer header spinlock.
+	 */
+	proclist_head lock_waiters;
 } BufferDesc;
 
 /*
@@ -408,12 +443,6 @@ BufferDescriptorGetIOCV(const BufferDesc *bdesc)
 	return &(BufferIOCVArray[bdesc->buf_id]).cv;
 }
 
-static inline LWLock *
-BufferDescriptorGetContentLock(const BufferDesc *bdesc)
-{
-	return (LWLock *) (&bdesc->content_lock);
-}
-
 /*
  * Functions for acquiring/releasing a shared buffer header's spinlock.  Do
  * not apply these to local buffers!
@@ -491,18 +520,18 @@ extern PGDLLIMPORT CkptSortItem *CkptBufferIds;
 
 /* ResourceOwner callbacks to hold buffer I/Os and pins */
 extern PGDLLIMPORT const ResourceOwnerDesc buffer_io_resowner_desc;
-extern PGDLLIMPORT const ResourceOwnerDesc buffer_pin_resowner_desc;
+extern PGDLLIMPORT const ResourceOwnerDesc buffer_resowner_desc;
 
 /* Convenience wrappers over ResourceOwnerRemember/Forget */
 static inline void
 ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer)
 {
-	ResourceOwnerRemember(owner, Int32GetDatum(buffer), &buffer_pin_resowner_desc);
+	ResourceOwnerRemember(owner, Int32GetDatum(buffer), &buffer_resowner_desc);
 }
 static inline void
 ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
 {
-	ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_pin_resowner_desc);
+	ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_resowner_desc);
 }
 static inline void
 ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 715ae96f0f0..a40adf6b2a8 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -203,7 +203,20 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 typedef enum BufferLockMode
 {
 	BUFFER_LOCK_UNLOCK,
+
+	/*
+	 * A share lock conflicts with exclusive locks.
+	 */
 	BUFFER_LOCK_SHARE,
+
+	/*
+	 * A share-exclusive lock conflicts with itself and exclusive locks.
+	 */
+	BUFFER_LOCK_SHARE_EXCLUSIVE,
+
+	/*
+	 * An exclusive lock conflicts with every other lock type.
+	 */
 	BUFFER_LOCK_EXCLUSIVE,
 } BufferLockMode;
 
@@ -302,7 +315,24 @@ extern void BufferGetTag(Buffer buffer, RelFileLocator *rlocator,
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
 
 extern void UnlockBuffers(void);
-extern void LockBuffer(Buffer buffer, BufferLockMode mode);
+extern void UnlockBuffer(Buffer buffer);
+extern void LockBufferInternal(Buffer buffer, BufferLockMode mode);
+
+/*
+ * Handling BUFFER_LOCK_UNLOCK in bufmgr.c leads to sufficiently worse branch
+ * prediction to impact performance. Therefore handle that switch here, where
+ * most of the time `mode` will be a constant and thus can be optimized out by
+ * the compiler.
+ */
+static inline void
+LockBuffer(Buffer buffer, BufferLockMode mode)
+{
+	if (mode == BUFFER_LOCK_UNLOCK)
+		UnlockBuffer(buffer);
+	else
+		LockBufferInternal(buffer, mode);
+}
+
 extern bool ConditionalLockBuffer(Buffer buffer);
 extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 94f818b9f10..28c8c95c3f4 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -104,7 +104,6 @@ PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
-PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
 PG_LWLOCKTRANCHE(REPLICATION_ORIGIN_STATE, ReplicationOriginState)
 PG_LWLOCKTRANCHE(REPLICATION_SLOT_IO, ReplicationSlotIO)
 PG_LWLOCKTRANCHE(LOCK_FASTPATH, LockFastPath)
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index de7b2e0bd2c..039bc8353be 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -242,7 +242,13 @@ struct PGPROC
 	 */
 	bool		recoveryConflictPending;
 
-	/* Info about LWLock the process is currently waiting for, if any. */
+	/*
+	 * Info about LWLock the process is currently waiting for, if any.
+	 *
+	 * This is currently used both for lwlocks and buffer content locks, which
+	 * is acceptable, although not pretty, because a backend can't wait for
+	 * both types of locks at the same time.
+	 */
 	uint8		lwWaiting;		/* see LWLockWaitState */
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
 	proclist_node lwWaitLink;	/* position in LW lock wait list */
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 7d894522526..c0c223b2e32 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -17,6 +17,7 @@
 #include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
+#include "storage/proclist.h"
 
 BufferDescPadded *BufferDescriptors;
 char	   *BufferBlocks;
@@ -128,9 +129,7 @@ BufferManagerShmemInit(void)
 
 			pgaio_wref_clear(&buf->io_wref);
 
-			LWLockInitialize(BufferDescriptorGetContentLock(buf),
-							 LWTRANCHE_BUFFER_CONTENT);
-
+			proclist_init(&buf->lock_waiters);
 			ConditionVariableInit(BufferDescriptorGetIOCV(buf));
 		}
 	}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index b0de8e45d4d..6adf04903cb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -58,6 +58,7 @@
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
+#include "storage/proclist.h"
 #include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "storage/standby.h"
@@ -100,6 +101,12 @@ typedef struct PrivateRefCountData
 	 * How many times has the buffer been pinned by this backend.
 	 */
 	int32		refcount;
+
+	/*
+	 * Is the buffer locked by this backend? BUFFER_LOCK_UNLOCK indicates that
+	 * the buffer is not locked.
+	 */
+	BufferLockMode lockmode;
 } PrivateRefCountData;
 
 typedef struct PrivateRefCountEntry
@@ -210,8 +217,10 @@ static BufferDesc *PinCountWaitBuf = NULL;
  * Each buffer also has a private refcount that keeps track of the number of
  * times the buffer is pinned in the current process.  This is so that the
  * shared refcount needs to be modified only once if a buffer is pinned more
- * than once by an individual backend.  It's also used to check that no buffers
- * are still pinned at the end of transactions and when exiting.
+ * than once by an individual backend.  It's also used to check that no
+ * buffers are still pinned at the end of transactions and when exiting. We
+ * also use this mechanism to track whether this backend has a buffer locked,
+ * and, if so, in what mode.
  *
  *
  * To avoid - as we used to - requiring an array with NBuffers entries to keep
@@ -254,8 +263,8 @@ static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
 /* ResourceOwner callbacks to hold in-progress I/Os and buffer pins */
 static void ResOwnerReleaseBufferIO(Datum res);
 static char *ResOwnerPrintBufferIO(Datum res);
-static void ResOwnerReleaseBufferPin(Datum res);
-static char *ResOwnerPrintBufferPin(Datum res);
+static void ResOwnerReleaseBuffer(Datum res);
+static char *ResOwnerPrintBuffer(Datum res);
 
 const ResourceOwnerDesc buffer_io_resowner_desc =
 {
@@ -266,13 +275,13 @@ const ResourceOwnerDesc buffer_io_resowner_desc =
 	.DebugPrint = ResOwnerPrintBufferIO
 };
 
-const ResourceOwnerDesc buffer_pin_resowner_desc =
+const ResourceOwnerDesc buffer_resowner_desc =
 {
-	.name = "buffer pin",
+	.name = "buffer",
 	.release_phase = RESOURCE_RELEASE_BEFORE_LOCKS,
 	.release_priority = RELEASE_PRIO_BUFFER_PINS,
-	.ReleaseResource = ResOwnerReleaseBufferPin,
-	.DebugPrint = ResOwnerPrintBufferPin
+	.ReleaseResource = ResOwnerReleaseBuffer,
+	.DebugPrint = ResOwnerPrintBuffer
 };
 
 /*
@@ -351,6 +360,7 @@ ReservePrivateRefCountEntry(void)
 		/* clear the whole data member, just for future proofing */
 		memset(&victim_entry->data, 0, sizeof(victim_entry->data));
 		victim_entry->data.refcount = 0;
+		victim_entry->data.lockmode = BUFFER_LOCK_UNLOCK;
 
 		PrivateRefCountOverflowed++;
 	}
@@ -374,6 +384,7 @@ NewPrivateRefCountEntry(Buffer buffer)
 	PrivateRefCountArrayKeys[ReservedRefCountSlot] = buffer;
 	res->buffer = buffer;
 	res->data.refcount = 0;
+	res->data.lockmode = BUFFER_LOCK_UNLOCK;
 
 	/* update cache for the next lookup */
 	PrivateRefCountEntryLast = ReservedRefCountSlot;
@@ -540,6 +551,7 @@ static void
 ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 {
 	Assert(ref->data.refcount == 0);
+	Assert(ref->data.lockmode == BUFFER_LOCK_UNLOCK);
 
 	if (ref >= &PrivateRefCountArray[0] &&
 		ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
@@ -641,14 +653,27 @@ static void RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 #ifdef USE_ASSERT_CHECKING
-static void AssertNotCatalogBufferLock(LWLock *lock, LWLockMode mode,
-									   void *unused_context);
+static void AssertNotCatalogBufferLock(Buffer buffer, BufferLockMode mode);
 #endif
 static int	rlocator_comparator(const void *p1, const void *p2);
 static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb);
 static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b);
 static int	ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
 
+static void BufferLockAcquire(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockUnlock(Buffer buffer, BufferDesc *buf_hdr);
+static bool BufferLockConditional(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode);
+static bool BufferLockHeldByMeInMode(BufferDesc *buf_hdr, BufferLockMode mode);
+static bool BufferLockHeldByMe(BufferDesc *buf_hdr);
+static inline void BufferLockDisown(Buffer buffer, BufferDesc *buf_hdr);
+static inline int BufferLockDisownInternal(Buffer buffer, BufferDesc *buf_hdr);
+static inline bool BufferLockAttempt(BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockQueueSelf(BufferDesc *buf_hdr, BufferLockMode mode);
+static void BufferLockDequeueSelf(BufferDesc *buf_hdr);
+static void BufferLockWakeup(BufferDesc *buf_hdr, bool unlocked);
+static void BufferLockProcessRelease(BufferDesc *buf_hdr, BufferLockMode mode, uint64 lockstate);
+static inline uint64 BufferLockReleaseSub(BufferLockMode mode);
+
 
 /*
  * Implementation of PrefetchBuffer() for shared buffers.
@@ -2306,6 +2331,12 @@ retry:
 		goto retry;
 	}
 
+	/*
+	 * An invalidated buffer should not have any backends waiting to lock the
+	 * buffer, therefore BM_LOCK_WAKE_IN_PROGRESS should not be set.
+	 */
+	Assert(!(buf_state & BM_LOCK_WAKE_IN_PROGRESS));
+
 	/*
 	 * Clear out the buffer's tag and flags.  We must do this to ensure that
 	 * linear scans of the buffer array don't think the buffer is valid.
@@ -2382,6 +2413,12 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
 		return false;
 	}
 
+	/*
+	 * An invalidated buffer should not have any backends waiting to lock the
+	 * buffer, therefore BM_LOCK_WAKE_IN_PROGRESS should not be set.
+	 */
+	Assert(!(buf_state & BM_LOCK_WAKE_IN_PROGRESS));
+
 	/*
 	 * Clear out the buffer's tag and flags and usagecount.  This is not
 	 * strictly required, as BM_TAG_VALID/BM_VALID needs to be checked before
@@ -2449,8 +2486,6 @@ again:
 	 */
 	if (buf_state & BM_DIRTY)
 	{
-		LWLock	   *content_lock;
-
 		Assert(buf_state & BM_TAG_VALID);
 		Assert(buf_state & BM_VALID);
 
@@ -2468,8 +2503,7 @@ again:
 		 * one just happens to be trying to split the page the first one got
 		 * from StrategyGetBuffer.)
 		 */
-		content_lock = BufferDescriptorGetContentLock(buf_hdr);
-		if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE))
 		{
 			/*
 			 * Someone else has locked the buffer, so give it up and loop back
@@ -2498,7 +2532,7 @@ again:
 			if (XLogNeedsFlush(lsn)
 				&& StrategyRejectBuffer(strategy, buf_hdr, from_ring))
 			{
-				LWLockRelease(content_lock);
+				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 				UnpinBuffer(buf_hdr);
 				goto again;
 			}
@@ -2506,7 +2540,7 @@ again:
 
 		/* OK, do the I/O */
 		FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
-		LWLockRelease(content_lock);
+		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
 		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
 									  &buf_hdr->tag);
@@ -2948,7 +2982,7 @@ BufferIsLockedByMe(Buffer buffer)
 	else
 	{
 		bufHdr = GetBufferDescriptor(buffer - 1);
-		return LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr));
+		return BufferLockHeldByMe(bufHdr);
 	}
 }
 
@@ -2973,23 +3007,8 @@ BufferIsLockedByMeInMode(Buffer buffer, BufferLockMode mode)
 	}
 	else
 	{
-		LWLockMode	lw_mode;
-
-		switch (mode)
-		{
-			case BUFFER_LOCK_EXCLUSIVE:
-				lw_mode = LW_EXCLUSIVE;
-				break;
-			case BUFFER_LOCK_SHARE:
-				lw_mode = LW_SHARED;
-				break;
-			default:
-				pg_unreachable();
-		}
-
 		bufHdr = GetBufferDescriptor(buffer - 1);
-		return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
-									lw_mode);
+		return BufferLockHeldByMeInMode(bufHdr, mode);
 	}
 }
 
@@ -3376,7 +3395,7 @@ UnpinBufferNoOwner(BufferDesc *buf)
 		 * I'd better not still hold the buffer content lock. Can't use
 		 * BufferIsLockedByMe(), as that asserts the buffer is pinned.
 		 */
-		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
+		Assert(!BufferLockHeldByMe(buf));
 
 		/* decrement the shared reference count */
 		old_buf_state = pg_atomic_fetch_sub_u64(&buf->state, BUF_REFCOUNT_ONE);
@@ -4198,9 +4217,9 @@ CheckForBufferLeaks(void)
  * Check for exclusive-locked catalog buffers.  This is the core of
  * AssertCouldGetRelation().
  *
- * A backend would self-deadlock on LWLocks if the catalog scan read the
- * exclusive-locked buffer.  The main threat is exclusive-locked buffers of
- * catalogs used in relcache, because a catcache search on any catalog may
+ * A backend would self-deadlock on the content lock if the catalog scan read
+ * the exclusive-locked buffer.  The main threat is exclusive-locked buffers
+ * of catalogs used in relcache, because a catcache search on any catalog may
  * build that catalog's relcache entry.  We don't have an inventory of
  * catalogs relcache uses, so just check buffers of most catalogs.
  *
@@ -4214,26 +4233,45 @@ CheckForBufferLeaks(void)
 void
 AssertBufferLocksPermitCatalogRead(void)
 {
-	ForEachLWLockHeldByMe(AssertNotCatalogBufferLock, NULL);
+	PrivateRefCountEntry *res;
+
+	/* check the array */
+	for (int i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
+	{
+		if (PrivateRefCountArrayKeys[i] != InvalidBuffer)
+		{
+			res = &PrivateRefCountArray[i];
+
+			if (res->buffer == InvalidBuffer)
+				continue;
+
+			AssertNotCatalogBufferLock(res->buffer, res->data.lockmode);
+		}
+	}
+
+	/* if necessary search the hash */
+	if (PrivateRefCountOverflowed)
+	{
+		HASH_SEQ_STATUS hstat;
+
+		hash_seq_init(&hstat, PrivateRefCountHash);
+		while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			AssertNotCatalogBufferLock(res->buffer, res->data.lockmode);
+		}
+	}
 }
 
 static void
-AssertNotCatalogBufferLock(LWLock *lock, LWLockMode mode,
-						   void *unused_context)
+AssertNotCatalogBufferLock(Buffer buffer, BufferLockMode mode)
 {
-	BufferDesc *bufHdr;
+	BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
 	BufferTag	tag;
 	Oid			relid;
 
-	if (mode != LW_EXCLUSIVE)
+	if (mode != BUFFER_LOCK_EXCLUSIVE)
 		return;
 
-	if (!((BufferDescPadded *) lock > BufferDescriptors &&
-		  (BufferDescPadded *) lock < BufferDescriptors + NBuffers))
-		return;					/* not a buffer lock */
-
-	bufHdr = (BufferDesc *)
-		((char *) lock - offsetof(BufferDesc, content_lock));
 	tag = bufHdr->tag;
 
 	/*
@@ -4515,9 +4553,11 @@ static void
 FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 					IOObject io_object, IOContext io_context)
 {
-	LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+	Buffer		buffer = BufferDescriptorGetBuffer(buf);
+
+	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE);
 	FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
-	LWLockRelease(BufferDescriptorGetContentLock(buf));
+	BufferLockUnlock(buffer, buf);
 }
 
 /*
@@ -5660,9 +5700,10 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
  *
  * Used to clean up after errors.
  *
- * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
- * of releasing buffer content locks per se; the only thing we need to deal
- * with here is clearing any PIN_COUNT request that was in progress.
+ * Currently, we can expect that resource owner cleanup, via
+ * ResOwnerReleaseBufferPin(), took care of releasing buffer content locks per
+ * se; the only thing we need to deal with here is clearing any PIN_COUNT
+ * request that was in progress.
  */
 void
 UnlockBuffers(void)
@@ -5693,25 +5734,726 @@ UnlockBuffers(void)
 }
 
 /*
- * Acquire or release the content_lock for the buffer.
+ * Acquire the buffer content lock in the specified mode
+ *
+ * If the lock is not available, sleep until it is.
+ *
+ * Side effect: cancel/die interrupts are held off until lock release.
+ *
+ * This uses almost the same locking approach as lwlock.c's
+ * LWLockAcquire(). See documentation at the top of lwlock.c for a more
+ * detailed discussion.
+ *
+ * The reason that this, and most of the other BufferLock* functions, get both
+ * the Buffer and BufferDesc* as parameters, is that looking up one from the
+ * other repeatedly shows up noticeably in profiles.
+ *
+ * Callers should provide a constant for mode, for more efficient code
+ * generation.
+ */
+static inline void
+BufferLockAcquire(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode)
+{
+	PrivateRefCountEntry *entry;
+	int			extraWaits = 0;
+
+	/*
+	 * Get reference to the refcount entry before we hold the lock, it seems
+	 * better to do before holding the lock.
+	 */
+	entry = GetPrivateRefCountEntry(buffer, true);
+
+	/*
+	 * We better not already hold a lock on the buffer.
+	 */
+	Assert(entry->data.lockmode == BUFFER_LOCK_UNLOCK);
+
+	/*
+	 * Lock out cancel/die interrupts until we exit the code section protected
+	 * by the content lock.  This ensures that interrupts will not interfere
+	 * with manipulations of data structures in shared memory.
+	 */
+	HOLD_INTERRUPTS();
+
+	for (;;)
+	{
+		bool		mustwait;
+		uint32		wait_event;
+
+		/*
+		 * Try to grab the lock the first time, we're not in the waitqueue
+		 * yet/anymore.
+		 */
+		mustwait = BufferLockAttempt(buf_hdr, mode);
+
+		if (likely(!mustwait))
+		{
+			break;
+		}
+
+		/*
+		 * Ok, at this point we couldn't grab the lock on the first try. We
+		 * cannot simply queue ourselves to the end of the list and wait to be
+		 * woken up because by now the lock could long have been released.
+		 * Instead add us to the queue and try to grab the lock again. If we
+		 * succeed we need to revert the queuing and be happy, otherwise we
+		 * recheck the lock. If we still couldn't grab it, we know that the
+		 * other locker will see our queue entries when releasing since they
+		 * existed before we checked for the lock.
+		 */
+
+		/* add to the queue */
+		BufferLockQueueSelf(buf_hdr, mode);
+
+		/* we're now guaranteed to be woken up if necessary */
+		mustwait = BufferLockAttempt(buf_hdr, mode);
+
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
+		{
+			BufferLockDequeueSelf(buf_hdr);
+			break;
+		}
+
+		switch (mode)
+		{
+			case BUFFER_LOCK_EXCLUSIVE:
+				wait_event = WAIT_EVENT_BUFFER_EXCLUSIVE;
+				break;
+			case BUFFER_LOCK_SHARE_EXCLUSIVE:
+				wait_event = WAIT_EVENT_BUFFER_SHARE_EXCLUSIVE;
+				break;
+			case BUFFER_LOCK_SHARE:
+				wait_event = WAIT_EVENT_BUFFER_SHARED;
+				break;
+			case BUFFER_LOCK_UNLOCK:
+				pg_unreachable();
+
+		}
+		pgstat_report_wait_start(wait_event);
+
+		/*
+		 * Wait until awakened.
+		 *
+		 * It is possible that we get awakened for a reason other than being
+		 * signaled by BufferLockWakeup().  If so, loop back and wait again.
+		 * Once we've gotten the lock, re-increment the sema by the number of
+		 * additional signals received.
+		 */
+		for (;;)
+		{
+			PGSemaphoreLock(MyProc->sem);
+			if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
+				break;
+			extraWaits++;
+		}
+
+		pgstat_report_wait_end();
+
+		/* Retrying, allow BufferLockRelease to release waiters again. */
+		pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_WAKE_IN_PROGRESS);
+	}
+
+	/* Remember that we now hold this lock */
+	entry->data.lockmode = mode;
+
+	/*
+	 * Fix the process wait semaphore's count for any absorbed wakeups.
+	 */
+	while (unlikely(extraWaits-- > 0))
+		PGSemaphoreUnlock(MyProc->sem);
+}
+
+/*
+ * Release a previously acquired buffer content lock.
+ */
+static void
+BufferLockUnlock(Buffer buffer, BufferDesc *buf_hdr)
+{
+	BufferLockMode mode;
+	uint64		oldstate;
+	uint64		sub;
+
+	mode = BufferLockDisownInternal(buffer, buf_hdr);
+
+	/*
+	 * Release my hold on lock, after that it can immediately be acquired by
+	 * others, even if we still have to wakeup other waiters.
+	 */
+	sub = BufferLockReleaseSub(mode);
+
+	oldstate = pg_atomic_sub_fetch_u64(&buf_hdr->state, sub);
+
+	BufferLockProcessRelease(buf_hdr, mode, oldstate);
+
+	/*
+	 * Now okay to allow cancel/die interrupts.
+	 */
+	RESUME_INTERRUPTS();
+}
+
+
+/*
+ * Acquire the content lock for the buffer, but only if we don't have to wait.
+ */
+static bool
+BufferLockConditional(Buffer buffer, BufferDesc *buf_hdr, BufferLockMode mode)
+{
+	PrivateRefCountEntry *entry = GetPrivateRefCountEntry(buffer, true);
+	bool		mustwait;
+
+	/*
+	 * We better not already hold a lock on the buffer.
+	 */
+	Assert(entry->data.lockmode == BUFFER_LOCK_UNLOCK);
+
+	/*
+	 * Lock out cancel/die interrupts until we exit the code section protected
+	 * by the content lock.  This ensures that interrupts will not interfere
+	 * with manipulations of data structures in shared memory.
+	 */
+	HOLD_INTERRUPTS();
+
+	/* Check for the lock */
+	mustwait = BufferLockAttempt(buf_hdr, mode);
+
+	if (mustwait)
+	{
+		/* Failed to get lock, so release interrupt holdoff */
+		RESUME_INTERRUPTS();
+	}
+	else
+	{
+		entry->data.lockmode = mode;
+	}
+
+	return !mustwait;
+}
+
+/*
+ * Internal function that tries to atomically acquire the content lock in the
+ * passed in mode.
+ *
+ * This function will not block waiting for a lock to become free - that's the
+ * caller's job.
+ *
+ * Similar to LWLockAttemptLock().
+ */
+static inline bool
+BufferLockAttempt(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+	uint64		old_state;
+
+	/*
+	 * Read once outside the loop, later iterations will get the newer value
+	 * via compare & exchange.
+	 */
+	old_state = pg_atomic_read_u64(&buf_hdr->state);
+
+	/* loop until we've determined whether we could acquire the lock or not */
+	while (true)
+	{
+		uint64		desired_state;
+		bool		lock_free;
+
+		desired_state = old_state;
+
+		if (mode == BUFFER_LOCK_EXCLUSIVE)
+		{
+			lock_free = (old_state & BM_LOCK_MASK) == 0;
+			if (lock_free)
+				desired_state += BM_LOCK_VAL_EXCLUSIVE;
+		}
+		else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+		{
+			lock_free = (old_state & (BM_LOCK_VAL_EXCLUSIVE | BM_LOCK_VAL_SHARE_EXCLUSIVE)) == 0;
+			if (lock_free)
+				desired_state += BM_LOCK_VAL_SHARE_EXCLUSIVE;
+		}
+		else
+		{
+			lock_free = (old_state & BM_LOCK_VAL_EXCLUSIVE) == 0;
+			if (lock_free)
+				desired_state += BM_LOCK_VAL_SHARED;
+		}
+
+		/*
+		 * Attempt to swap in the state we are expecting. If we didn't see
+		 * lock to be free, that's just the old value. If we saw it as free,
+		 * we'll attempt to mark it acquired. The reason that we always swap
+		 * in the value is that this doubles as a memory barrier. We could try
+		 * to be smarter and only swap in values if we saw the lock as free,
+		 * but benchmark haven't shown it as beneficial so far.
+		 *
+		 * Retry if the value changed since we last looked at it.
+		 */
+		if (likely(pg_atomic_compare_exchange_u64(&buf_hdr->state,
+												  &old_state, desired_state)))
+		{
+			if (lock_free)
+			{
+				/* Great! Got the lock. */
+				return false;
+			}
+			else
+				return true;	/* somebody else has the lock */
+		}
+	}
+
+	pg_unreachable();
+}
+
+/*
+ * Add ourselves to the end of the content lock's wait queue.
+ */
+static void
+BufferLockQueueSelf(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+	/*
+	 * If we don't have a PGPROC structure, there's no way to wait. This
+	 * should never occur, since MyProc should only be null during shared
+	 * memory initialization.
+	 */
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	if (MyProc->lwWaiting != LW_WS_NOT_WAITING)
+		elog(PANIC, "queueing for lock while waiting on another one");
+
+	LockBufHdr(buf_hdr);
+
+	/* setting the flag is protected by the spinlock */
+	pg_atomic_fetch_or_u64(&buf_hdr->state, BM_LOCK_HAS_WAITERS);
+
+	/*
+	 * These are currently used both for lwlocks and buffer content locks,
+	 * which is acceptable, although not pretty, because a backend can't wait
+	 * for both types of locks at the same time.
+	 */
+	MyProc->lwWaiting = LW_WS_WAITING;
+	MyProc->lwWaitMode = mode;
+
+	proclist_push_tail(&buf_hdr->lock_waiters, MyProcNumber, lwWaitLink);
+
+	/* Can release the mutex now */
+	UnlockBufHdr(buf_hdr);
+}
+
+/*
+ * Remove ourselves from the waitlist.
+ *
+ * This is used if we queued ourselves because we thought we needed to sleep
+ * but, after further checking, we discovered that we don't actually need to
+ * do so.
+ */
+static void
+BufferLockDequeueSelf(BufferDesc *buf_hdr)
+{
+	bool		on_waitlist;
+
+	LockBufHdr(buf_hdr);
+
+	on_waitlist = MyProc->lwWaiting == LW_WS_WAITING;
+	if (on_waitlist)
+		proclist_delete(&buf_hdr->lock_waiters, MyProcNumber, lwWaitLink);
+
+	if (proclist_is_empty(&buf_hdr->lock_waiters) &&
+		(pg_atomic_read_u64(&buf_hdr->state) & BM_LOCK_HAS_WAITERS) != 0)
+	{
+		pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_HAS_WAITERS);
+	}
+
+	/* XXX: combine with fetch_and above? */
+	UnlockBufHdr(buf_hdr);
+
+	/* clear waiting state again, nice for debugging */
+	if (on_waitlist)
+		MyProc->lwWaiting = LW_WS_NOT_WAITING;
+	else
+	{
+		int			extraWaits = 0;
+
+
+		/*
+		 * Somebody else dequeued us and has or will wake us up. Deal with the
+		 * superfluous absorption of a wakeup.
+		 */
+
+		/*
+		 * Clear BM_LOCK_WAKE_IN_PROGRESS if somebody woke us before we
+		 * removed ourselves - they'll have set it.
+		 */
+		pg_atomic_fetch_and_u64(&buf_hdr->state, ~BM_LOCK_WAKE_IN_PROGRESS);
+
+		/*
+		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
+		 * get reset at some inconvenient point later. Most of the time this
+		 * will immediately return.
+		 */
+		for (;;)
+		{
+			PGSemaphoreLock(MyProc->sem);
+			if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
+				break;
+			extraWaits++;
+		}
+
+		/*
+		 * Fix the process wait semaphore's count for any absorbed wakeups.
+		 */
+		while (extraWaits-- > 0)
+			PGSemaphoreUnlock(MyProc->sem);
+	}
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * After calling this function it's the callers responsibility to ensure that
+ * the lock gets released, even in case of an error. This only is desirable if
+ * the lock is going to be released in a different process than the process
+ * that acquired it.
+ */
+static inline void
+BufferLockDisown(Buffer buffer, BufferDesc *buf_hdr)
+{
+	BufferLockDisownInternal(buffer, buf_hdr);
+	RESUME_INTERRUPTS();
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * This is the code that can be shared between actually releasing a lock
+ * (BufferLockUnlock()) and just not tracking ownership of the lock anymore
+ * without releasing the lock (BufferLockDisown()).
+ */
+static inline int
+BufferLockDisownInternal(Buffer buffer, BufferDesc *buf_hdr)
+{
+	BufferLockMode mode;
+	PrivateRefCountEntry *ref;
+
+	ref = GetPrivateRefCountEntry(buffer, false);
+	if (ref == NULL)
+		elog(ERROR, "lock %d is not held", buffer);
+	mode = ref->data.lockmode;
+	ref->data.lockmode = BUFFER_LOCK_UNLOCK;
+
+	return mode;
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to acquire the lock.
+ *
+ * wake_exclusive indicates whether exclusive lock waiters should be woken up.
+ */
+static void
+BufferLockWakeup(BufferDesc *buf_hdr, bool wake_exclusive)
+{
+	bool		new_wake_in_progress = false;
+	bool		wake_share_exclusive = true;
+	proclist_head wakeup;
+	proclist_mutable_iter iter;
+
+	proclist_init(&wakeup);
+
+	/* lock wait list while collecting backends to wake up */
+	LockBufHdr(buf_hdr);
+
+	proclist_foreach_modify(iter, &buf_hdr->lock_waiters, lwWaitLink)
+	{
+		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
+
+		/*
+		 * Already woke up a conflicting lock, so skip over this wait list
+		 * entry.
+		 */
+		if (!wake_exclusive && waiter->lwWaitMode == BUFFER_LOCK_EXCLUSIVE)
+			continue;
+		if (!wake_share_exclusive && waiter->lwWaitMode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+			continue;
+
+		proclist_delete(&buf_hdr->lock_waiters, iter.cur, lwWaitLink);
+		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
+
+		/*
+		 * Prevent additional wakeups until retryer gets to run. Backends that
+		 * are just waiting for the lock to become free don't retry
+		 * automatically.
+		 */
+		new_wake_in_progress = true;
+
+		/*
+		 * Signal that the process isn't on the wait list anymore. This allows
+		 * BufferLockDequeueSelf() to remove itself from the waitlist with a
+		 * proclist_delete(), rather than having to check if it has been
+		 * removed from the list.
+		 */
+		Assert(waiter->lwWaiting == LW_WS_WAITING);
+		waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
+
+		/*
+		 * Don't wakeup further waiters after waking a conflicting waiter.
+		 */
+		if (waiter->lwWaitMode == BUFFER_LOCK_SHARE)
+		{
+			/*
+			 * Share locks conflict with exclusive locks.
+			 */
+			wake_exclusive = false;
+		}
+		else if (waiter->lwWaitMode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+		{
+			/*
+			 * Share-exclusive locks conflict with share-exclusive and
+			 * exclusive locks.
+			 */
+			wake_exclusive = false;
+			wake_share_exclusive = false;
+		}
+		else if (waiter->lwWaitMode == BUFFER_LOCK_EXCLUSIVE)
+		{
+
+			/*
+			 * Exclusive locks conflict with all other locks, there's no point
+			 * in waking up anybody else.
+			 */
+			break;
+		}
+	}
+
+	Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u64(&buf_hdr->state) & BM_LOCK_HAS_WAITERS);
+
+	/* unset required flags, and release lock, in one fell swoop */
+	{
+		uint64		old_state;
+		uint64		desired_state;
+
+		old_state = pg_atomic_read_u64(&buf_hdr->state);
+		while (true)
+		{
+			desired_state = old_state;
+
+			/* compute desired flags */
+
+			if (new_wake_in_progress)
+				desired_state |= BM_LOCK_WAKE_IN_PROGRESS;
+			else
+				desired_state &= ~BM_LOCK_WAKE_IN_PROGRESS;
+
+			if (proclist_is_empty(&buf_hdr->lock_waiters))
+				desired_state &= ~BM_LOCK_HAS_WAITERS;
+
+			desired_state &= ~BM_LOCKED;	/* release lock */
+
+			if (pg_atomic_compare_exchange_u64(&buf_hdr->state, &old_state,
+											   desired_state))
+				break;
+		}
+	}
+
+	/* Awaken any waiters I removed from the queue. */
+	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
+	{
+		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
+
+		proclist_delete(&wakeup, iter.cur, lwWaitLink);
+
+		/*
+		 * Guarantee that lwWaiting being unset only becomes visible once the
+		 * unlink from the link has completed. Otherwise the target backend
+		 * could be woken up for other reason and enqueue for a new lock - if
+		 * that happens before the list unlink happens, the list would end up
+		 * being corrupted.
+		 *
+		 * The barrier pairs with the LockBufHdr() when enqueuing for another
+		 * lock.
+		 */
+		pg_write_barrier();
+		waiter->lwWaiting = LW_WS_NOT_WAITING;
+		PGSemaphoreUnlock(waiter->sem);
+	}
+}
+
+/*
+ * Compute subtraction from buffer state for a release of a held lock in
+ * `mode`.
+ *
+ * This is separated from BufferLockUnlock() as we want to combine the lock
+ * release with other atomic operations when possible, leading to the lock
+ * release being done in multiple places, each needing to compute what to
+ * subtract from the lock state.
+ */
+static inline uint64
+BufferLockReleaseSub(BufferLockMode mode)
+{
+
+	/*
+	 * Turns out that a switch() leads gcc to generate sufficiently worse code
+	 * for this to show up in profiles...
+	 */
+	if (mode == BUFFER_LOCK_EXCLUSIVE)
+		return BM_LOCK_VAL_EXCLUSIVE;
+	else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+		return BM_LOCK_VAL_SHARE_EXCLUSIVE;
+	else
+	{
+		Assert(mode == BUFFER_LOCK_SHARE);
+		return BM_LOCK_VAL_SHARED;
+	}
+
+	return 0;					/* keep compiler quiet */
+}
+
+/*
+ * Handle work that needs to be done after releasing a lock that was held in
+ * `mode`, where `lockstate` is the result of the atomic operation modifying
+ * the state variable.
+ *
+ * This is separated from BufferLockUnlock() as we want to combine the lock
+ * release with other atomic operations when possible, leading to the lock
+ * release being done in multiple places.
+ */
+static void
+BufferLockProcessRelease(BufferDesc *buf_hdr, BufferLockMode mode, uint64 lockstate)
+{
+	bool		check_waiters = false;
+	bool		wake_exclusive = false;
+
+	/* nobody else can have that kind of lock */
+	Assert(!(lockstate & BM_LOCK_VAL_EXCLUSIVE));
+
+	/*
+	 * If we're still waiting for backends to get scheduled, don't wake them
+	 * up again. Otherwise check if we need to look through the waitqueue to
+	 * wake other backends.
+	 */
+	if ((lockstate & BM_LOCK_HAS_WAITERS) &&
+		!(lockstate & BM_LOCK_WAKE_IN_PROGRESS))
+	{
+		if ((lockstate & BM_LOCK_MASK) == 0)
+		{
+			/*
+			 * We released a lock and the lock was, in that moment, free. We
+			 * therefore can wake waiters for any kind of lock.
+			 */
+			check_waiters = true;
+			wake_exclusive = true;
+		}
+		else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+		{
+			/*
+			 * We released the lock, but another backend still holds a lock.
+			 * We can't have released an exclusive lock, as there couldn't
+			 * have been other lock holders. If we released a share lock, no
+			 * waiters need to be woken up, as there must be other share
+			 * lockers. However, if we held a share-exclusive lock, another
+			 * backend now could acquire a share-exclusive lock.
+			 */
+			check_waiters = true;
+			wake_exclusive = false;
+		}
+	}
+
+	/*
+	 * As waking up waiters requires the spinlock to be acquired, only do so
+	 * if necessary.
+	 */
+	if (check_waiters)
+		BufferLockWakeup(buf_hdr, wake_exclusive);
+}
+
+/*
+ * BufferLockHeldByMeInMode - test whether my process holds the content lock
+ * in the specified mode
+ *
+ * This is meant as debug support only.
+ */
+static bool
+BufferLockHeldByMeInMode(BufferDesc *buf_hdr, BufferLockMode mode)
+{
+	PrivateRefCountEntry *entry =
+		GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf_hdr), false);
+
+	if (!entry)
+		return false;
+	else
+		return entry->data.lockmode == mode;
+
+}
+
+/*
+ * BufferLockHeldByMe - test whether my process holds the content lock in any
+ * mode
+ *
+ * This is meant as debug support only.
+ */
+static bool
+BufferLockHeldByMe(BufferDesc *buf_hdr)
+{
+	PrivateRefCountEntry *entry =
+		GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf_hdr), false);
+
+	if (!entry)
+		return false;
+	else
+		return entry->data.lockmode != BUFFER_LOCK_UNLOCK;
+}
+
+/*
+ * Release the content lock for the buffer.
+ */
+void
+UnlockBuffer(Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+
+	Assert(BufferIsPinned(buffer));
+	if (BufferIsLocal(buffer))
+		return;					/* local buffers need no lock */
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+	BufferLockUnlock(buffer, buf_hdr);
+}
+
+/*
+ * Acquire the content_lock for the buffer.
  */
 void
-LockBuffer(Buffer buffer, BufferLockMode mode)
+LockBufferInternal(Buffer buffer, BufferLockMode mode)
 {
-	BufferDesc *buf;
+	BufferDesc *buf_hdr;
+
+	/*
+	 * We can't wait if we haven't got a PGPROC.  This should only occur
+	 * during bootstrap or shared memory initialization.  Put an Assert here
+	 * to catch unsafe coding practices.
+	 */
+	Assert(!(MyProc == NULL && IsUnderPostmaster));
+
+	/* handled in LockBuffer() wrapper */
+	Assert(mode != BUFFER_LOCK_UNLOCK);
 
 	Assert(BufferIsPinned(buffer));
 	if (BufferIsLocal(buffer))
 		return;					/* local buffers need no lock */
 
-	buf = GetBufferDescriptor(buffer - 1);
+	buf_hdr = GetBufferDescriptor(buffer - 1);
 
-	if (mode == BUFFER_LOCK_UNLOCK)
-		LWLockRelease(BufferDescriptorGetContentLock(buf));
-	else if (mode == BUFFER_LOCK_SHARE)
-		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+	/*
+	 * Test the most frequent lock modes first. While a switch (mode) would be
+	 * nice, at least gcc generates considerably worse code for it.
+	 *
+	 * Call BufferLockAcquire() with a constant argument for mode, to generate
+	 * more efficient code for the different lock modes.
+	 */
+	if (mode == BUFFER_LOCK_SHARE)
+		BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_SHARE);
 	else if (mode == BUFFER_LOCK_EXCLUSIVE)
-		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
+		BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_EXCLUSIVE);
+	else if (mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+		BufferLockAcquire(buffer, buf_hdr, BUFFER_LOCK_SHARE_EXCLUSIVE);
 	else
 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
 }
@@ -5732,8 +6474,7 @@ ConditionalLockBuffer(Buffer buffer)
 
 	buf = GetBufferDescriptor(buffer - 1);
 
-	return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
-									LW_EXCLUSIVE);
+	return BufferLockConditional(buffer, buf, BUFFER_LOCK_EXCLUSIVE);
 }
 
 /*
@@ -6247,8 +6988,8 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 /*
  * AbortBufferIO: Clean up active buffer I/O after an error.
  *
- *	All LWLocks we might have held have been released,
- *	but we haven't yet released buffer pins, so the buffer is still pinned.
+ *	All LWLocks & content locks we might have held have been released, but we
+ *	haven't yet released buffer pins, so the buffer is still pinned.
  *
  *	If I/O was in progress, we always set BM_IO_ERROR, even though it's
  *	possible the error condition wasn't related to the I/O.
@@ -6676,8 +7417,14 @@ ResOwnerPrintBufferIO(Datum res)
 	return psprintf("lost track of buffer IO on buffer %d", buffer);
 }
 
+/*
+ * Release buffer as part of resource owner cleanup. This will only be called
+ * if the buffer is pinned. If this backend held the content lock at the time
+ * of the error we also need to release that (note that it is not possible to
+ * hold a content lock without a pin).
+ */
 static void
-ResOwnerReleaseBufferPin(Datum res)
+ResOwnerReleaseBuffer(Datum res)
 {
 	Buffer		buffer = DatumGetInt32(res);
 
@@ -6688,11 +7435,32 @@ ResOwnerReleaseBufferPin(Datum res)
 	if (BufferIsLocal(buffer))
 		UnpinLocalBufferNoOwner(buffer);
 	else
+	{
+		PrivateRefCountEntry *ref;
+
+		ref = GetPrivateRefCountEntry(buffer, false);
+
+		/* not having a private refcount would imply resowner corruption */
+		Assert(ref != NULL);
+
+		/*
+		 * If the buffer was locked at the time of the resowner release,
+		 * release the lock now. This should only happen after errors.
+		 */
+		if (ref->data.lockmode != BUFFER_LOCK_UNLOCK)
+		{
+			BufferDesc *buf = GetBufferDescriptor(buffer - 1);
+
+			HOLD_INTERRUPTS();	/* match the upcoming RESUME_INTERRUPTS */
+			BufferLockUnlock(buffer, buf);
+		}
+
 		UnpinBufferNoOwner(GetBufferDescriptor(buffer - 1));
+	}
 }
 
 static char *
-ResOwnerPrintBufferPin(Datum res)
+ResOwnerPrintBuffer(Datum res)
 {
 	return DebugPrintBufferRefcount(DatumGetInt32(res));
 }
@@ -6924,10 +7692,10 @@ MarkDirtyUnpinnedBufferInternal(Buffer buf, BufferDesc *desc,
 	/* If it was not already dirty, mark it as dirty. */
 	if (!(buf_state & BM_DIRTY))
 	{
-		LWLockAcquire(BufferDescriptorGetContentLock(desc), LW_EXCLUSIVE);
+		BufferLockAcquire(buf, desc, BUFFER_LOCK_EXCLUSIVE);
 		MarkBufferDirty(buf);
 		result = true;
-		LWLockRelease(BufferDescriptorGetContentLock(desc));
+		BufferLockUnlock(buf, desc);
 	}
 	else
 		*buffer_already_dirty = true;
@@ -7178,16 +7946,12 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
 		 */
 		if (is_write && !is_temp)
 		{
-			LWLock	   *content_lock;
-
-			content_lock = BufferDescriptorGetContentLock(buf_hdr);
-
-			Assert(LWLockHeldByMe(content_lock));
+			Assert(BufferLockHeldByMe(buf_hdr));
 
 			/*
 			 * Lock is now owned by AIO subsystem.
 			 */
-			LWLockDisown(content_lock);
+			BufferLockDisown(buffer, buf_hdr);
 		}
 
 		/*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 3299de23bb3..b8936d30d7e 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -287,6 +287,9 @@ ABI_compatibility:
 Section: ClassName - WaitEventBuffer
 
 BUFFER_CLEANUP	"Waiting to acquire an exclusive pin on a buffer. Buffer pin waits can be protracted if another process holds an open cursor that last read data from the buffer in question."
+BUFFER_SHARED	"Waiting to acquire a shared lock on a buffer."
+BUFFER_SHARE_EXCLUSIVE	"Waiting to acquire a share exclusive lock on a buffer."
+BUFFER_EXCLUSIVE	"Waiting to acquire a exclusive lock on a buffer."
 
 ABI_compatibility:
 
@@ -374,7 +377,6 @@ MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
-BufferContent	"Waiting to access a data page in memory."
 ReplicationOriginState	"Waiting to read or update the progress of one replication origin."
 ReplicationSlotIO	"Waiting for I/O on a replication slot."
 LockFastPath	"Waiting to read or update a process' fast-path lock information."
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0003-lwlock-Remove-ForEachLWLockHeldByMe.patch (2.2K, 4-v11-0003-lwlock-Remove-ForEachLWLockHeldByMe.patch)
  download | inline diff:
From d2eabd283e76aeb1da967581d47b0576a104c28e Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 14 Jan 2026 13:39:50 -0500
Subject: [PATCH v11 3/7] lwlock: Remove ForEachLWLockHeldByMe

As of commit FIXME-XXX-UPDATEME, ForEachLWLockHeldByMe(), introduced in
f4ece891fc2f, is not used anymore, as content locks are now implemented in
bufmgr.c.  It doesn't seem that likely that a new user of the functionality
will appear all that soon, making removal of the function seem like the most
sensible path. It can easily be added back if necessary.

Discussion: https://postgr.es/m/lneuyxqxamqoayd2ntau3lqjblzdckw6tjgeu4574ezwh4tzlg%40noioxkquezdw
---
 src/include/storage/lwlock.h      |  2 --
 src/backend/storage/lmgr/lwlock.c | 15 ---------------
 2 files changed, 17 deletions(-)

diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a98d302c602..df589902adc 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -129,8 +129,6 @@ extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64
 extern void LWLockReleaseAll(void);
 extern void LWLockDisown(LWLock *lock);
 extern void LWLockReleaseDisowned(LWLock *lock, LWLockMode mode);
-extern void ForEachLWLockHeldByMe(void (*callback) (LWLock *, LWLockMode, void *),
-								  void *context);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 415794682a2..2ee0339c52e 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1955,21 +1955,6 @@ LWLockReleaseAll(void)
 }
 
 
-/*
- * ForEachLWLockHeldByMe - run a callback for each held lock
- *
- * This is meant as debug support only.
- */
-void
-ForEachLWLockHeldByMe(void (*callback) (LWLock *, LWLockMode, void *),
-					  void *context)
-{
-	int			i;
-
-	for (i = 0; i < num_held_lwlocks; i++)
-		callback(held_lwlocks[i].lock, held_lwlocks[i].mode, context);
-}
-
 /*
  * LWLockHeldByMe - test whether my process holds a lock in any mode
  *
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0004-lwlock-Remove-support-for-disowned-lwlwocks.patch (4.6K, 5-v11-0004-lwlock-Remove-support-for-disowned-lwlwocks.patch)
  download | inline diff:
From 60c879adc2540dad404a05e7f2c20957573b4211 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 14 Jan 2026 13:54:00 -0500
Subject: [PATCH v11 4/7] lwlock: Remove support for disowned lwlwocks

This reverts commit f8d7f29b3e81db59b95e4b5baaa6943178c89fd8, plus parts of
subsequent commits fixing a typo an a parameter name.

Support for disowned lwlocks was added for the benefit of AIO, to be able to
have content locks "owned" by the AIO subsystem. But as of commit
FIXME-XXX-UPDATEME, content locks do not use lwlocks anymore.

It does not seem particularly likely that we need this facility outside of the
AIO use-case, therefore remove the now unused functions.

I did choose to keep the comment added in the aforementioned commit about
lock->owner intentionally being left pointing to the last owner.
---
 src/include/storage/lwlock.h      |  2 -
 src/backend/storage/lmgr/lwlock.c | 71 +++----------------------------
 2 files changed, 6 insertions(+), 67 deletions(-)

diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index df589902adc..9a0290391d0 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -127,8 +127,6 @@ extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
-extern void LWLockDisown(LWLock *lock);
-extern void LWLockReleaseDisowned(LWLock *lock, LWLockMode mode);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 2ee0339c52e..a133c97b992 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1783,25 +1783,18 @@ LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 
 
 /*
- * Stop treating lock as held by current backend.
- *
- * This is the code that can be shared between actually releasing a lock
- * (LWLockRelease()) and just not tracking ownership of the lock anymore
- * without releasing the lock (LWLockDisown()).
- *
- * Returns the mode in which the lock was held by the current backend.
- *
- * NB: This does not call RESUME_INTERRUPTS(), but leaves that responsibility
- * of the caller.
+ * LWLockRelease - release a previously acquired lock
  *
  * NB: This will leave lock->owner pointing to the current backend (if
  * LOCK_DEBUG is set). This is somewhat intentional, as it makes it easier to
  * debug cases of missing wakeups during lock release.
  */
-static inline LWLockMode
-LWLockDisownInternal(LWLock *lock)
+void
+LWLockRelease(LWLock *lock)
 {
 	LWLockMode	mode;
+	uint32		oldstate;
+	bool		check_waiters;
 	int			i;
 
 	/*
@@ -1821,18 +1814,7 @@ LWLockDisownInternal(LWLock *lock)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	return mode;
-}
-
-/*
- * Helper function to release lock, shared between LWLockRelease() and
- * LWLockReleaseDisowned().
- */
-static void
-LWLockReleaseInternal(LWLock *lock, LWLockMode mode)
-{
-	uint32		oldstate;
-	bool		check_waiters;
+	PRINT_LWDEBUG("LWLockRelease", lock, mode);
 
 	/*
 	 * Release my hold on lock, after that it can immediately be acquired by
@@ -1870,38 +1852,6 @@ LWLockReleaseInternal(LWLock *lock, LWLockMode mode)
 		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
 		LWLockWakeup(lock);
 	}
-}
-
-
-/*
- * Stop treating lock as held by current backend.
- *
- * After calling this function it's the callers responsibility to ensure that
- * the lock gets released (via LWLockReleaseDisowned()), even in case of an
- * error. This only is desirable if the lock is going to be released in a
- * different process than the process that acquired it.
- */
-void
-LWLockDisown(LWLock *lock)
-{
-	LWLockDisownInternal(lock);
-
-	RESUME_INTERRUPTS();
-}
-
-/*
- * LWLockRelease - release a previously acquired lock
- */
-void
-LWLockRelease(LWLock *lock)
-{
-	LWLockMode	mode;
-
-	mode = LWLockDisownInternal(lock);
-
-	PRINT_LWDEBUG("LWLockRelease", lock, mode);
-
-	LWLockReleaseInternal(lock, mode);
 
 	/*
 	 * Now okay to allow cancel/die interrupts.
@@ -1909,15 +1859,6 @@ LWLockRelease(LWLock *lock)
 	RESUME_INTERRUPTS();
 }
 
-/*
- * Release lock previously disowned with LWLockDisown().
- */
-void
-LWLockReleaseDisowned(LWLock *lock, LWLockMode mode)
-{
-	LWLockReleaseInternal(lock, mode);
-}
-
 /*
  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
  */
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0005-Require-share-exclusive-lock-to-set-hint-bits-an.patch (40.8K, 6-v11-0005-Require-share-exclusive-lock-to-set-hint-bits-an.patch)
  download | inline diff:
From ec49b1a8665a2ce78946b127360f16f92b58b00e Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v11 5/7] Require share-exclusive lock to set hint bits and to
 flush

At the moment hint bits can be set with just a share lock on a page (and,
until 45f658dacb9, in one case even without any lock). Because of this we need
to copy pages while writing them out, as otherwise the checksum could be
corrupted.

The need to copy the page is problematic to implement AIO writes:

1) Instead of just needing a single buffer for a copied page we need one for
   each page that's potentially undergoing I/O
2) To be able to use the "worker" AIO implementation the copied page needs to
   reside in shared memory

It also causes problems for using unbuffered/direct-IO, independent of AIO:
Some filesystems, raid implementations, ... do not tolerate the data being
written out to change during the write. E.g. they may compute internal
checksums that can be invalidated by concurrent modifications, leading e.g. to
filesystem errors (as the case with btrfs).

It also just is plain odd to allow modifications of buffers that are just
share locked.

To address these issue, this commit changes the rules so that modifications to
pages are not allowed anymore while holding a share lock. Instead the new
share-exclusive lock (introduced in FIXME XXXX TODO) allows at most one
backend to modify a buffer while other backends have the same page share
locked. An existing share-lock can be upgraded to a share-exclusive lock, if
there are no conflicting locks. For that
BufferBeginSetHintBits()/BufferFinishSetHintBits() and BufferSetHintBits16()
have been introduced.

To prevent hint bits from being set while the buffer is being written out,
writing out buffers now requires a share-exclusive lock.

The use of share-exclusive to gate setting hint bits means that from now on
only one backend can set hint bits at a time. To allow multiple backends to
set hint bits would require more complicated locking, for setting hint bits
we'd need to store the count of backends currently setting hint bits and we
would need another lock-level for I/O conflicting with the lock-level to set
hint bits. Given that the share-exclusive lock for setting hint bits is only
held for a short time, that backends would often just set the same hint bits
and that the cost of occasionally not setting hint bits in hotly accessed
pages is fairly low, this seems like an acceptable tradeoff.

The biggest change to adapt to this is in heapam. To avoid performance
regressions for sequential scans that need to set a lot of hint bits, we need
to amortize the cost of BufferBeginSetHintBits() for cases where hint bits are
set at a high frequency, HeapTupleSatisfiesMVCCBatch() uses the new
SetHintBitsExt() which defers BufferFinishSetHintBits() until all hint bits on
a page have been set.  Conversely, to avoid regressions in cases where we
can't set hint bits in bulk (because we're looking only at individual tuples),
use BufferSetHintBits16() when setting hint bits without batching.

Several other places also need to be adapted, but those changes are
comparatively simpler.

After this we do not need to copy buffers to write them out anymore. That
change is done separately however.

TODO:
- Update commit reference above
- Update FIXME comments

Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf%40gcnactj4z56m
---
 src/include/storage/bufmgr.h                |   4 +
 src/backend/access/gist/gistget.c           |  19 +-
 src/backend/access/hash/hashutil.c          |  10 +-
 src/backend/access/heap/heapam_visibility.c | 130 ++++++--
 src/backend/access/nbtree/nbtinsert.c       |  28 +-
 src/backend/access/nbtree/nbtutils.c        |  16 +-
 src/backend/storage/buffer/README           |  66 ++--
 src/backend/storage/buffer/bufmgr.c         | 327 ++++++++++++++++----
 src/backend/storage/freespace/freespace.c   |  14 +-
 src/backend/storage/freespace/fsmpage.c     |  11 +-
 src/tools/pgindent/typedefs.list            |   1 +
 11 files changed, 482 insertions(+), 144 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a40adf6b2a8..4017896f951 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -314,6 +314,10 @@ extern void BufferGetTag(Buffer buffer, RelFileLocator *rlocator,
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
 
+extern bool BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer);
+extern bool BufferBeginSetHintBits(Buffer buffer);
+extern void BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std);
+
 extern void UnlockBuffers(void);
 extern void UnlockBuffer(Buffer buffer);
 extern void LockBufferInternal(Buffer buffer, BufferLockMode mode);
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 11b214eb99b..fc346dc9484 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -64,11 +64,7 @@ gistkillitems(IndexScanDesc scan)
 	 * safe.
 	 */
 	if (BufferGetLSNAtomic(buffer) != so->curPageLSN)
-	{
-		UnlockReleaseBuffer(buffer);
-		so->numKilled = 0;		/* reset counter */
-		return;
-	}
+		goto unlock;
 
 	Assert(GistPageIsLeaf(page));
 
@@ -78,6 +74,16 @@ gistkillitems(IndexScanDesc scan)
 	 */
 	for (i = 0; i < so->numKilled; i++)
 	{
+		if (!killedsomething)
+		{
+			/*
+			 * Use hint bit infrastructure to be allowed to modify the page
+			 * without holding an exclusive lock.
+			 */
+			if (!BufferBeginSetHintBits(buffer))
+				goto unlock;
+		}
+
 		offnum = so->killedItems[i];
 		iid = PageGetItemId(page, offnum);
 		ItemIdMarkDead(iid);
@@ -87,9 +93,10 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		BufferFinishSetHintBits(buffer, true, true);
 	}
 
+unlock:
 	UnlockReleaseBuffer(buffer);
 
 	/*
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index cf7f0b90176..b917c97321a 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -593,6 +593,13 @@ _hash_kill_items(IndexScanDesc scan)
 
 			if (ItemPointerEquals(&ituple->t_tid, &currItem->heapTid))
 			{
+				/*
+				 * Use hint bit infrastructure to be allowed to modify the
+				 * page without holding an exclusive lock.
+				 */
+				if (!BufferBeginSetHintBits(so->currPos.buf))
+					goto unlock_page;
+
 				/* found the item */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -610,9 +617,10 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(so->currPos.buf, true, true);
 	}
 
+unlock_page:
 	if (so->hashso_bucket_buf == so->currPos.buf ||
 		havePin)
 		LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 75ae268d753..fc64f4343ce 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -80,10 +80,38 @@
 
 
 /*
- * SetHintBits()
+ * To be allowed to set hint bits, SetHintBits() needs to call
+ * BufferBeginSetHintBits(). However, that's not free, and some callsites call
+ * SetHintBits() on many tuples in a row. For those it makes sense to amortize
+ * the cost of BufferBeginSetHintBits(). Additionally it's desirable to defer
+ * the cost of BufferBeginSetHintBits() until a hint bit needs to actually be
+ * set. This enum serves as the necessary state space passed to
+ * SetHintBitsExt().
+ */
+typedef enum SetHintBitsState
+{
+	/* not yet checked if hint bits may be set */
+	SHB_INITIAL,
+	/* failed to get permission to set hint bits, don't check again */
+	SHB_DISABLED,
+	/* allowed to set hint bits */
+	SHB_ENABLED,
+} SetHintBitsState;
+
+/*
+ * SetHintBitsExt()
  *
  * Set commit/abort hint bits on a tuple, if appropriate at this time.
  *
+ * To be allowed to set a hint bit on a tuple, the page must not be undergoing
+ * IO at this time (otherwise we e.g. could corrupt PG's page checksum or even
+ * the filesystem's, as is known to happen with btrfs).
+ *
+ * The right to set a hint bit can be acquired on a page level with
+ * BufferBeginSetHintBits(). Only a single backend gets the right to set hint
+ * bits at a time.  Alternatively, if called with a NULL SetHintBitsState*,
+ * hint bits are set with BufferSetHintBits16().
+ *
  * It is only safe to set a transaction-committed hint bit if we know the
  * transaction's commit record is guaranteed to be flushed to disk before the
  * buffer, or if the table is temporary or unlogged and will be obliterated by
@@ -111,24 +139,67 @@
  * InvalidTransactionId if no check is needed.
  */
 static inline void
-SetHintBits(HeapTupleHeader tuple, Buffer buffer,
-			uint16 infomask, TransactionId xid)
+SetHintBitsExt(HeapTupleHeader tuple, Buffer buffer,
+			   uint16 infomask, TransactionId xid, SetHintBitsState *state)
 {
+	/*
+	 * In batched mode, if we previously did not get permission to set hint
+	 * bits, don't try again - in all likelihood IO is still going on.
+	 */
+	if (state && *state == SHB_DISABLED)
+		return;
+
 	if (TransactionIdIsValid(xid))
 	{
-		/* NB: xid must be known committed here! */
-		XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+		if (BufferIsPermanent(buffer))
+		{
+			/* NB: xid must be known committed here! */
+			XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+
+			if (XLogNeedsFlush(commitLSN) &&
+				BufferGetLSNAtomic(buffer) < commitLSN)
+			{
+				/* not flushed and no LSN interlock, so don't set hint */
+				return;
+			}
+		}
+	}
+
+	/*
+	 * If we're not operating in batch mode, use BufferSetHintBits16() to mark
+	 * the page dirty, that's cheaper than
+	 * BufferBeginSetHintBits()/BufferFinishSetHintBits(). That's important
+	 * for cases where we set a lot of hint bits on a page individually.
+	 */
+	if (!state)
+	{
+		BufferSetHintBits16(&tuple->t_infomask,
+							tuple->t_infomask | infomask, buffer);
+		return;
+	}
 
-		if (BufferIsPermanent(buffer) && XLogNeedsFlush(commitLSN) &&
-			BufferGetLSNAtomic(buffer) < commitLSN)
+	if (*state == SHB_INITIAL)
+	{
+		if (!BufferBeginSetHintBits(buffer))
 		{
-			/* not flushed and no LSN interlock, so don't set hint */
+			*state = SHB_DISABLED;
 			return;
 		}
-	}
 
+		*state = SHB_ENABLED;
+	}
 	tuple->t_infomask |= infomask;
-	MarkBufferDirtyHint(buffer, true);
+}
+
+/*
+ * Simple wrapper around SetHintBitExt(), use when operating on a single
+ * tuple.
+ */
+static inline void
+SetHintBits(HeapTupleHeader tuple, Buffer buffer,
+			uint16 infomask, TransactionId xid)
+{
+	SetHintBitsExt(tuple, buffer, infomask, xid, NULL);
 }
 
 /*
@@ -864,9 +935,9 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
  * inserting/deleting transaction was still running --- which was more cycles
  * and more contention on ProcArrayLock.
  */
-static bool
+static inline bool
 HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
-					   Buffer buffer)
+					   Buffer buffer, SetHintBitsState *state)
 {
 	HeapTupleHeader tuple = htup->t_data;
 
@@ -921,8 +992,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 			if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
 			{
 				/* deleting subtransaction must have aborted */
-				SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-							InvalidTransactionId);
+				SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+							   InvalidTransactionId, state);
 				return true;
 			}
 
@@ -934,13 +1005,13 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
 			return false;
 		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
-			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-						HeapTupleHeaderGetRawXmin(tuple));
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_COMMITTED,
+						   HeapTupleHeaderGetRawXmin(tuple), state);
 		else
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_INVALID,
+						   InvalidTransactionId, state);
 			return false;
 		}
 	}
@@ -1003,14 +1074,14 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+						   InvalidTransactionId, state);
 			return true;
 		}
 
 		/* xmax transaction committed */
-		SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
-					HeapTupleHeaderGetRawXmax(tuple));
+		SetHintBitsExt(tuple, buffer, HEAP_XMAX_COMMITTED,
+					   HeapTupleHeaderGetRawXmax(tuple), state);
 	}
 	else
 	{
@@ -1607,9 +1678,10 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
  * ->vistuples_dense is set to contain the offsets of visible tuples.
  *
  * The reason this is more efficient than HeapTupleSatisfiesMVCC() is that it
- * avoids a cross-translation-unit function call for each tuple and allows the
- * compiler to optimize across calls to HeapTupleSatisfiesMVCC. In the future
- * it will also allow more efficient setting of hint bits.
+ * avoids a cross-translation-unit function call for each tuple, allows the
+ * compiler to optimize across calls to HeapTupleSatisfiesMVCC and allows
+ * setting hint bits more efficiently (see the one BufferFinishSetHintBits()
+ * call below).
  *
  * Returns the number of visible tuples.
  */
@@ -1620,6 +1692,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 							OffsetNumber *vistuples_dense)
 {
 	int			nvis = 0;
+	SetHintBitsState state = SHB_INITIAL;
 
 	Assert(IsMVCCSnapshot(snapshot));
 
@@ -1628,7 +1701,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		bool		valid;
 		HeapTuple	tup = &batchmvcc->tuples[i];
 
-		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer);
+		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer, &state);
 		batchmvcc->visible[i] = valid;
 
 		if (likely(valid))
@@ -1638,6 +1711,9 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		}
 	}
 
+	if (state == SHB_ENABLED)
+		BufferFinishSetHintBits(buffer, true, true);
+
 	return nvis;
 }
 
@@ -1657,7 +1733,7 @@ HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 	switch (snapshot->snapshot_type)
 	{
 		case SNAPSHOT_MVCC:
-			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
+			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer, NULL);
 		case SNAPSHOT_SELF:
 			return HeapTupleSatisfiesSelf(htup, snapshot, buffer);
 		case SNAPSHOT_ANY:
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 63eda08f7a2..da43af3ec96 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -681,20 +681,28 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				{
 					/*
 					 * The conflicting tuple (or all HOT chains pointed to by
-					 * all posting list TIDs) is dead to everyone, so mark the
-					 * index entry killed.
+					 * all posting list TIDs) is dead to everyone, so try to
+					 * mark the index entry killed. It's ok if we're not
+					 * allowed to, this isn't required for correctness.
 					 */
-					ItemIdMarkDead(curitemid);
-					opaque->btpo_flags |= BTP_HAS_GARBAGE;
+					Buffer		buf;
 
-					/*
-					 * Mark buffer with a dirty hint, since state is not
-					 * crucial. Be sure to mark the proper buffer dirty.
-					 */
+					/* Be sure to operate on the proper buffer */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						buf = nbuf;
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						buf = insertstate->buf;
+
+					/*
+					 * Can't use BufferSetHintBits16() here as we update two
+					 * different locations.
+					 */
+					if (BufferBeginSetHintBits(buf))
+					{
+						ItemIdMarkDead(curitemid);
+						opaque->btpo_flags |= BTP_HAS_GARBAGE;
+						BufferFinishSetHintBits(buf, true, true);
+					}
 				}
 
 				/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 5c50f0dd1bd..a76d90f2d8e 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -357,10 +357,19 @@ _bt_killitems(IndexScanDesc scan)
 			 * it's possible that multiple processes attempt to do this
 			 * simultaneously, leading to multiple full-page images being sent
 			 * to WAL (if wal_log_hints or data checksums are enabled), which
-			 * is undesirable.
+			 * is undesirable.  We need to use the hint bit infrastructure to
+			 * update the page while just holding a share lock.
 			 */
 			if (killtuple && !ItemIdIsDead(iid))
 			{
+				/*
+				 * If we're not able to set hint bits, there's no point
+				 * continuing.
+				 */
+				if (!killedsomething &&
+					!BufferBeginSetHintBits(buf))
+					goto unlock_page;
+
 				/* found the item/all posting list items */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -371,8 +380,6 @@ _bt_killitems(IndexScanDesc scan)
 	}
 
 	/*
-	 * Since this can be redone later if needed, mark as dirty hint.
-	 *
 	 * Whenever we mark anything LP_DEAD, we also set the page's
 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.  (Note that we
 	 * only rely on the page-level flag in !heapkeyspace indexes.)
@@ -380,9 +387,10 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(buf, true, true);
 	}
 
+unlock_page:
 	if (!so->dropPin)
 		_bt_unlockbuf(rel, buf);
 	else
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 119f31b5d65..b3ff5a0e441 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -25,21 +25,26 @@ that might need to do such a wait is instead handled by waiting to obtain
 the relation-level lock, which is why you'd better hold one first.)  Pins
 may not be held across transaction boundaries, however.
 
-Buffer content locks: there are two kinds of buffer lock, shared and exclusive,
-which act just as you'd expect: multiple backends can hold shared locks on
-the same buffer, but an exclusive lock prevents anyone else from holding
-either shared or exclusive lock.  (These can alternatively be called READ
-and WRITE locks.)  These locks are intended to be short-term: they should not
-be held for long.  Buffer locks are acquired and released by LockBuffer().
-It will *not* work for a single backend to try to acquire multiple locks on
-the same buffer.  One must pin a buffer before trying to lock it.
+Buffer content locks: there are three kinds of buffer lock, shared,
+share-exclusive and exclusive:
+a) multiple backends can hold shared locks on the same buffer
+   (alternatively called a READ lock)
+b) one backend can hold a share-exclusive lock on a buffer while multiple
+   backends can hold a share lock
+c) an exclusive lock prevents anyone else from holding shared, share-exclusive
+   or exclusive lock.
+   (alternatively called a WRITE lock)
+
+These locks are intended to be short-term: they should not be held for long.
+Buffer locks are acquired and released by LockBuffer().  It will *not* work
+for a single backend to try to acquire multiple locks on the same buffer.  One
+must pin a buffer before trying to lock it.
 
 Buffer access rules:
 
-1. To scan a page for tuples, one must hold a pin and either shared or
-exclusive content lock.  To examine the commit status (XIDs and status bits)
-of a tuple in a shared buffer, one must likewise hold a pin and either shared
-or exclusive lock.
+1. To scan a page for tuples, one must hold a pin and at least a share lock.
+To examine the commit status (XIDs and status bits) of a tuple in a shared
+buffer, one must likewise hold a pin and at least a share lock.
 
 2. Once one has determined that a tuple is interesting (visible to the
 current transaction) one may drop the content lock, yet continue to access
@@ -55,19 +60,25 @@ one must hold a pin and an exclusive content lock on the containing buffer.
 This ensures that no one else might see a partially-updated state of the
 tuple while they are doing visibility checks.
 
-4. It is considered OK to update tuple commit status bits (ie, OR the
-values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
-HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and
-pin on a buffer.  This is OK because another backend looking at the tuple
-at about the same time would OR the same bits into the field, so there
-is little or no risk of conflicting update; what's more, if there did
-manage to be a conflict it would merely mean that one bit-update would
-be lost and need to be done again later.  These four bits are only hints
-(they cache the results of transaction status lookups in pg_xact), so no
-great harm is done if they get reset to zero by conflicting updates.
-Note, however, that a tuple is frozen by setting both HEAP_XMIN_INVALID
-and HEAP_XMIN_COMMITTED; this is a critical update and accordingly requires
-an exclusive buffer lock (and it must also be WAL-logged).
+4. Non-critical information on a page ("hint bits") may be modified while
+holding only a share-exclusive lock and pin on the page. To do so in cases
+where only a share lock is already held, use BufferBeginSetHintBits() &
+BufferFinishSetHintBits() (if multiple hint bits are to be set) or
+BufferSetHintBits16() (if a single hint bit is set).
+
+E.g. for heapam, a share-exclusive lock allows to update tuple commit status
+bits (ie, OR the values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID,
+HEAP_XMAX_COMMITTED, or HEAP_XMAX_INVALID into t_infomask) while holding only
+a share-exclusive lock and pin on a buffer.  This is OK because another
+backend looking at the tuple at about the same time would OR the same bits
+into the field, so there is little or no risk of conflicting update; what's
+more, if there did manage to be a conflict it would merely mean that one
+bit-update would be lost and need to be done again later.  These four bits are
+only hints (they cache the results of transaction status lookups in pg_xact),
+so no great harm is done if they get reset to zero by conflicting updates.
+Note, however, that a tuple is frozen by setting both HEAP_XMIN_INVALID and
+HEAP_XMIN_COMMITTED; this is a critical update and accordingly requires an
+exclusive buffer lock (and it must also be WAL-logged).
 
 5. To physically remove a tuple or compact free space on a page, one
 must hold a pin and an exclusive lock, *and* observe while holding the
@@ -80,7 +91,6 @@ buffer (increment the refcount) while one is performing the cleanup, but
 it won't be able to actually examine the page until it acquires shared
 or exclusive content lock.
 
-
 Obtaining the lock needed under rule #5 is done by the bufmgr routines
 LockBufferForCleanup() or ConditionalLockBufferForCleanup().  They first get
 an exclusive lock and then check to see if the shared pin count is currently
@@ -96,6 +106,10 @@ VACUUM's use, since we don't allow multiple VACUUMs concurrently on a single
 relation anyway.  Anyone wishing to obtain a cleanup lock outside of recovery
 or a VACUUM must use the conditional variant of the function.
 
+6. To write out a buffer, a share-exclusive lock needs to be held. This
+prevents the buffer from being modified while written out, which could corrupt
+checksums and cause issues on the OS or device level when direct-IO is used.
+
 
 Buffer Manager's Internal Locking
 ---------------------------------
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 6adf04903cb..402bf81b269 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2480,9 +2480,8 @@ again:
 	/*
 	 * If the buffer was dirty, try to write it out.  There is a race
 	 * condition here, in that someone might dirty it after we released the
-	 * buffer header lock above, or even while we are writing it out (since
-	 * our share-lock won't prevent hint-bit updates).  We will recheck the
-	 * dirty bit after re-locking the buffer header.
+	 * buffer header lock above.  We will recheck the dirty bit after
+	 * re-locking the buffer header.
 	 */
 	if (buf_state & BM_DIRTY)
 	{
@@ -2490,20 +2489,20 @@ again:
 		Assert(buf_state & BM_VALID);
 
 		/*
-		 * We need a share-lock on the buffer contents to write it out (else
-		 * we might write invalid data, eg because someone else is compacting
-		 * the page contents while we write).  We must use a conditional lock
-		 * acquisition here to avoid deadlock.  Even though the buffer was not
-		 * pinned (and therefore surely not locked) when StrategyGetBuffer
-		 * returned it, someone else could have pinned and exclusive-locked it
-		 * by the time we get here. If we try to get the lock unconditionally,
-		 * we'd block waiting for them; if they later block waiting for us,
-		 * deadlock ensues. (This has been observed to happen when two
-		 * backends are both trying to split btree index pages, and the second
-		 * one just happens to be trying to split the page the first one got
-		 * from StrategyGetBuffer.)
+		 * We need a share-exclusive lock on the buffer contents to write it
+		 * out (else we might write invalid data, eg because someone else is
+		 * compacting the page contents while we write).  We must use a
+		 * conditional lock acquisition here to avoid deadlock.  Even though
+		 * the buffer was not pinned (and therefore surely not locked) when
+		 * StrategyGetBuffer returned it, someone else could have pinned and
+		 * (share-)exclusive-locked it by the time we get here. If we try to
+		 * get the lock unconditionally, we'd block waiting for them; if they
+		 * later block waiting for us, deadlock ensues. (This has been
+		 * observed to happen when two backends are both trying to split btree
+		 * index pages, and the second one just happens to be trying to split
+		 * the page the first one got from StrategyGetBuffer.)
 		 */
-		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE))
+		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE_EXCLUSIVE))
 		{
 			/*
 			 * Someone else has locked the buffer, so give it up and loop back
@@ -4072,8 +4071,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 
 	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
+	 * Pin it, share-exclusive-lock it, write it.  (FlushBuffer will do
+	 * nothing if the buffer is clean by the time we've locked it.)
 	 */
 	PinBuffer_Locked(bufHdr);
 
@@ -4403,11 +4402,8 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
- * buffer contents.  (Note: a share-lock does not prevent updates of
- * hint bits in the buffer, so the page could change while the write
- * is in progress, but we assume that that will not invalidate the data
- * written.)
+ * The caller must hold a pin on the buffer and have
+ * (share-)exclusively-locked the buffer contents.
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
@@ -4423,6 +4419,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	char	   *bufToWrite;
 	uint64		buf_state;
 
+	Assert(BufferLockHeldByMeInMode(buf, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(buf, BUFFER_LOCK_SHARE_EXCLUSIVE));
+
 	/*
 	 * Try to start an I/O operation.  If StartBufferIO returns false, then
 	 * someone else flushed the buffer before we could, so we need not do
@@ -4555,7 +4554,7 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 {
 	Buffer		buffer = BufferDescriptorGetBuffer(buf);
 
-	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE);
+	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE_EXCLUSIVE);
 	FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
 	BufferLockUnlock(buffer, buf);
 }
@@ -5474,8 +5473,8 @@ FlushDatabaseBuffers(Oid dbid)
 }
 
 /*
- * Flush a previously, shared or exclusively, locked and pinned buffer to the
- * OS.
+ * Flush a previously, share-exclusively or exclusively, locked and pinned
+ * buffer to the OS.
  */
 void
 FlushOneBuffer(Buffer buffer)
@@ -5548,39 +5547,24 @@ IncrBufferRefCount(Buffer buffer)
 }
 
 /*
- * MarkBufferDirtyHint
+ * Shared-buffer only helper for MarkBufferDirtyHint() and
+ * BufferSetHintBits16().
  *
- *	Mark a buffer dirty for non-critical changes.
- *
- * This is essentially the same as MarkBufferDirty, except:
- *
- * 1. The caller does not write WAL; so if checksums are enabled, we may need
- *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
- * 2. The caller might have only share-lock instead of exclusive-lock on the
- *	  buffer's content lock.
- * 3. This function does not guarantee that the buffer is always marked dirty
- *	  (due to a race condition), so it cannot be used for important changes.
+ * This is separated out because it turns out that the repeated checks for
+ * local buffers, repeated GetBufferDescriptor() and repeated reading of the
+ * buffer's state sufficiently hurts the performance of BufferSetHintBits16().
  */
-void
-MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+static inline void
+MarkSharedBufferDirtyHint(Buffer buffer, BufferDesc *bufHdr, uint64 lockstate,
+						  bool buffer_std)
 {
-	BufferDesc *bufHdr;
 	Page		page = BufferGetPage(buffer);
 
-	if (!BufferIsValid(buffer))
-		elog(ERROR, "bad buffer ID: %d", buffer);
-
-	if (BufferIsLocal(buffer))
-	{
-		MarkLocalBufferDirty(buffer);
-		return;
-	}
-
-	bufHdr = GetBufferDescriptor(buffer - 1);
-
 	Assert(GetPrivateRefCount(buffer) > 0);
-	/* here, either share or exclusive lock is OK */
-	Assert(BufferIsLockedByMe(buffer));
+
+	/* here, either share-exclusive or exclusive lock is OK */
+	Assert(BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE));
 
 	/*
 	 * This routine might get called many times on the same page, if we are
@@ -5593,8 +5577,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	 * is only intended to be used in cases where failing to write out the
 	 * data would be harmless anyway, it doesn't really matter.
 	 */
-	if ((pg_atomic_read_u64(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
-		(BM_DIRTY | BM_JUST_DIRTIED))
+	if (unlikely((lockstate & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+				 (BM_DIRTY | BM_JUST_DIRTIED)))
 	{
 		XLogRecPtr	lsn = InvalidXLogRecPtr;
 		bool		dirtied = false;
@@ -5610,8 +5594,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		 * We don't check full_page_writes here because that logic is included
 		 * when we call XLogInsert() since the value changes dynamically.
 		 */
-		if (XLogHintBitIsNeeded() &&
-			(pg_atomic_read_u64(&bufHdr->state) & BM_PERMANENT))
+		if (XLogHintBitIsNeeded() && (lockstate & BM_PERMANENT))
 		{
 			/*
 			 * If we must not write WAL, due to a relfilelocator-specific
@@ -5663,13 +5646,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 			dirtied = true;		/* Means "will be dirtied by this action" */
 
 			/*
-			 * Set the page LSN if we wrote a backup block. We aren't supposed
-			 * to set this when only holding a share lock but as long as we
-			 * serialise it somehow we're OK. We choose to set LSN while
-			 * holding the buffer header lock, which causes any reader of an
-			 * LSN who holds only a share lock to also obtain a buffer header
-			 * lock before using PageGetLSN(), which is enforced in
-			 * BufferGetLSNAtomic().
+			 * Set the page LSN if we wrote a backup block. To allow backends
+			 * that only hold a share lock on the buffer to read the LSN in a
+			 * tear-free manner, we set the page LSN while holding the buffer
+			 * header lock. This allows any reader of an LSN who holds only a
+			 * share lock to also obtain a buffer header lock before using
+			 * PageGetLSN() to read the LSN in a tear free way. This is done
+			 * in BufferGetLSNAtomic().
 			 *
 			 * If checksums are enabled, you might think we should reset the
 			 * checksum here. That will happen when the page is written
@@ -5695,6 +5678,41 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	}
 }
 
+/*
+ * MarkBufferDirtyHint
+ *
+ *	Mark a buffer dirty for non-critical changes.
+ *
+ * This is essentially the same as MarkBufferDirty, except:
+ *
+ * 1. The caller does not write WAL; so if checksums are enabled, we may need
+ *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
+ * 2. The caller might have only a share-exclusive-lock instead of an
+ *	  exclusive-lock on the buffer's content lock.
+ * 3. This function does not guarantee that the buffer is always marked dirty
+ *	  (due to a race condition), so it cannot be used for important changes.
+ */
+inline void
+MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+{
+	BufferDesc *bufHdr;
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+
+	if (!BufferIsValid(buffer))
+		elog(ERROR, "bad buffer ID: %d", buffer);
+
+	if (BufferIsLocal(buffer))
+	{
+		MarkLocalBufferDirty(buffer);
+		return;
+	}
+
+	MarkSharedBufferDirtyHint(buffer, bufHdr,
+							  pg_atomic_read_u64(&bufHdr->state),
+							  buffer_std);
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
@@ -6789,6 +6807,187 @@ IsBufferCleanupOK(Buffer buffer)
 	return false;
 }
 
+/*
+ * Helper for BufferBeginSetHintBits() and BufferSetHintBits16().
+ *
+ * This checks if the current lock mode already suffices to allow hint bits
+ * being set and, if not, whether the current lock can be upgraded.
+ */
+static inline bool
+SharedBufferBeginSetHintBits(Buffer buffer, BufferDesc *buf_hdr, uint64 *lockstate)
+{
+	uint64		old_state;
+	PrivateRefCountEntry *ref;
+	BufferLockMode mode;
+
+	ref = GetPrivateRefCountEntry(buffer, true);
+
+	if (ref == NULL)
+		elog(ERROR, "lock is not held");
+
+	mode = ref->data.lockmode;
+	if (mode == BUFFER_LOCK_UNLOCK)
+		elog(ERROR, "buffer is not locked");
+
+	/*
+	 * Already am holding a sufficient lock level.
+	 */
+	if (mode == BUFFER_LOCK_EXCLUSIVE || mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+	{
+		*lockstate = pg_atomic_read_u64(&buf_hdr->state);
+		return true;
+	}
+
+	/*
+	 * Only holding a share lock right now, try to upgrade to SHARE_EXCLUSIVE.
+	 */
+	Assert(mode == BUFFER_LOCK_SHARE);
+
+	old_state = pg_atomic_read_u64(&buf_hdr->state);
+	while (true)
+	{
+		uint64		desired_state;
+
+		desired_state = old_state;
+
+		/*
+		 * Can't upgrade if somebody else holds the lock in exclusive or
+		 * share-exclusive mode.
+		 */
+		if (unlikely((old_state & (BM_LOCK_VAL_EXCLUSIVE | BM_LOCK_VAL_SHARE_EXCLUSIVE)) != 0))
+		{
+			return false;
+		}
+
+		/* currently held lock state */
+		desired_state -= BM_LOCK_VAL_SHARED;
+
+		/* new lock level */
+		desired_state += BM_LOCK_VAL_SHARE_EXCLUSIVE;
+
+		if (likely(pg_atomic_compare_exchange_u64(&buf_hdr->state,
+												  &old_state, desired_state)))
+		{
+			ref->data.lockmode = BUFFER_LOCK_SHARE_EXCLUSIVE;
+			*lockstate = desired_state;
+
+			return true;
+		}
+	}
+
+}
+
+/*
+ * Try to acquire the right to set hint bits on the buffer.
+ *
+ * To be allowed to set hint bits, this backend needs to hold either a
+ * share-exclusive or an exclusive lock. In case this backend only holds a
+ * share lock, this function will try to upgrade the lock to
+ * share-exclusive. The caller is only allowed to set hint bits if true is
+ * returned.
+ *
+ * Once BufferBeginSetHintBits() has returned true, hint bits may be set
+ * without further calls to BufferBeginSetHintBits(), until the buffer is
+ * unlocked.
+ *
+ *
+ * Requiring a share-exclusive lock to set hint bits prevents setting hint
+ * bits on buffers that are currently being written out, which could corrupt
+ * the checksum on the page. Flushing buffers also requires a share-exclusive
+ * lock.
+ *
+ * Due to a lock >= share-exclusive being required to set hint bits, only one
+ * backend can set hint bits at a time. Allowing multiple backends to hint
+ * bits would require more complicated locking: For setting hint bits we'd
+ * need to store the count of backends currently setting hint bits, for I/O we
+ * would need another lock-level conflicting with the hint-setting
+ * lock-level. Given that the share-exclusive lock for setting hint bits is
+ * only held for a short time, that backends often would just set the same
+ * hint bits and that the cost of occasionally not setting hint bits in hotly
+ * accessed pages is fairly low, this seems like an acceptable tradeoff.
+ */
+bool
+BufferBeginSetHintBits(Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+
+	if (BufferIsLocal(buffer))
+	{
+		/*
+		 * NB: Will need to check if there is a write in progress, once it is
+		 * possible for writes to be done asynchronously.
+		 */
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	return SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate);
+}
+
+/*
+ * End a phase of setting hint bits on this buffer, started with
+ * BufferBeginSetHintBits().
+ *
+ * This would strictly speaking not be required (i.e. the caller could do
+ * MarkBufferDirtyHint() if so desired), but allows us to perform some sanity
+ * checks.
+ */
+void
+BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std)
+{
+	if (!BufferIsLocal(buffer))
+		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_SHARE_EXCLUSIVE) ||
+			   BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
+
+	if (mark_dirty)
+		MarkBufferDirtyHint(buffer, buffer_std);
+}
+
+/*
+ * Try to set a single hint bit in a buffer.
+ *
+ * This is a bit faster than BufferBeginSetHintBits() /
+ * BufferFinishSetHintBits() when setting a single hint bit, but slower than
+ * the former when setting several hint bits.
+ */
+bool
+BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+#ifdef USE_ASSERT_CHECKING
+	char	   *page;
+
+	/* verify that the address is on the page */
+	page = BufferGetPage(buffer);
+	Assert((char *) ptr >= page && (char *) ptr < (page + BLCKSZ));
+#endif
+
+	if (BufferIsLocal(buffer))
+	{
+		*ptr = val;
+
+		MarkLocalBufferDirty(buffer);
+
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	if (SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate))
+	{
+		*ptr = val;
+
+		MarkSharedBufferDirtyHint(buffer, buf_hdr, lockstate, true);
+
+		return true;
+	}
+
+	return false;
+}
+
 
 /*
  *	Functions for buffer I/O handling
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index ad337c00871..b9a8f368a63 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -904,13 +904,17 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 	max_avail = fsm_get_max_avail(page);
 
 	/*
-	 * Reset the next slot pointer. This encourages the use of low-numbered
-	 * pages, increasing the chances that a later vacuum can truncate the
-	 * relation. We don't bother with marking the page dirty if it wasn't
-	 * already, since this is just a hint.
+	 * Try to reset the next slot pointer. This encourages the use of
+	 * low-numbered pages, increasing the chances that a later vacuum can
+	 * truncate the relation. We don't bother with marking the page dirty if
+	 * it wasn't already, since this is just a hint.
 	 */
 	LockBuffer(buf, BUFFER_LOCK_SHARE);
-	((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+	if (BufferBeginSetHintBits(buf))
+	{
+		((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+		BufferFinishSetHintBits(buf, false, false);
+	}
 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
 	ReleaseBuffer(buf);
diff --git a/src/backend/storage/freespace/fsmpage.c b/src/backend/storage/freespace/fsmpage.c
index 33ee825529c..e46bf2631fc 100644
--- a/src/backend/storage/freespace/fsmpage.c
+++ b/src/backend/storage/freespace/fsmpage.c
@@ -298,9 +298,18 @@ restart:
 	 * lock and get a garbled next pointer every now and then, than take the
 	 * concurrency hit of an exclusive lock.
 	 *
+	 * Without an exclusive lock, we need to use the hint bit infrastructure
+	 * to be allowed to modify the page.
+	 *
 	 * Wrap-around is handled at the beginning of this function.
 	 */
-	fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+	if (exclusive_lock_held || BufferBeginSetHintBits(buf))
+	{
+		fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+
+		if (!exclusive_lock_held)
+			BufferFinishSetHintBits(buf, false, true);
+	}
 
 	return slot;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 14dec2d49c1..efea48fcef7 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2750,6 +2750,7 @@ SetConstraintStateData
 SetConstraintTriggerData
 SetExprState
 SetFunctionReturnMode
+SetHintBitsState
 SetOp
 SetOpCmd
 SetOpPath
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0006-WIP-bufmgr-Don-t-copy-pages-while-writing-out.patch (11.6K, 7-v11-0006-WIP-bufmgr-Don-t-copy-pages-while-writing-out.patch)
  download | inline diff:
From 3e8707ee794245aee211b7019e512db3ab6da214 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v11 6/7] WIP: bufmgr: Don't copy pages while writing out

After the series of preceding commits introducing and using
BufferBeginSetHintBits()/BufferSetHintBits16() hint bits are not set
anymore while IO is going on. Therefore we do not need to copy pages while
they are being written out anymore.

TODO: Update comments

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufpage.h           |  3 +-
 src/backend/access/hash/hashpage.c      |  2 +-
 src/backend/access/transam/xloginsert.c | 43 ++++++----------------
 src/backend/storage/buffer/bufmgr.c     | 21 +++++------
 src/backend/storage/buffer/localbuf.c   |  2 +-
 src/backend/storage/page/bufpage.c      | 48 ++++---------------------
 src/backend/storage/smgr/bulk_write.c   |  2 +-
 src/test/modules/test_aio/test_aio.c    |  2 +-
 8 files changed, 33 insertions(+), 90 deletions(-)

diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index ae3725b3b81..31ec9a8a047 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -504,7 +504,6 @@ extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
 extern void PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum);
 extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									const void *newtup, Size newsize);
-extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
-extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern void PageSetChecksum(Page page, BlockNumber blkno);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 8e220a3ae16..52c20208c66 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -1029,7 +1029,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 					zerobuf.data,
 					true);
 
-	PageSetChecksumInplace(page, lastblock);
+	PageSetChecksum(page, lastblock);
 	smgrextend(RelationGetSmgr(rel), MAIN_FORKNUM, lastblock, zerobuf.data,
 			   false);
 
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..53cfdce8de8 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -261,8 +261,11 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 	 */
 #ifdef USE_ASSERT_CHECKING
 	if (!(flags & REGBUF_NO_CHANGE))
-		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) &&
-			   BufferIsDirty(buffer));
+	{
+		Assert(BufferIsDirty(buffer));
+		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) ||
+			   BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_SHARE_EXCLUSIVE));
+	}
 #endif
 
 	if (block_id >= max_registered_block_id)
@@ -1066,7 +1069,7 @@ XLogCheckBufferNeedsBackup(Buffer buffer)
  * Write a backup block if needed when we are setting a hint. Note that
  * this may be called for a variety of page types, not just heaps.
  *
- * Callable while holding just share lock on the buffer content.
+ * Callable while holding just a share-exclusive lock on the buffer content.
  *
  * We can't use the plain backup block mechanism since that relies on the
  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
@@ -1074,6 +1077,8 @@ XLogCheckBufferNeedsBackup(Buffer buffer)
  * failures. So instead we copy the page and insert the copied data as normal
  * record data.
  *
+ * FIXME: outdated
+ *
  * We only need to do something if page has not yet been full page written in
  * this checkpoint round. The LSN of the inserted wal record is returned if we
  * had to write, InvalidXLogRecPtr otherwise.
@@ -1102,46 +1107,20 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
 
 	/*
 	 * We assume page LSN is first data on *every* page that can be passed to
-	 * XLogInsert, whether it has the standard page layout or not. Since we're
-	 * only holding a share-lock on the page, we must take the buffer header
-	 * lock when we look at the LSN.
+	 * XLogInsert, whether it has the standard page layout or not.
 	 */
 	lsn = BufferGetLSNAtomic(buffer);
 
 	if (lsn <= RedoRecPtr)
 	{
-		int			flags = 0;
-		PGAlignedBlock copied_buffer;
-		char	   *origdata = (char *) BufferGetBlock(buffer);
-		RelFileLocator rlocator;
-		ForkNumber	forkno;
-		BlockNumber blkno;
-
-		/*
-		 * Copy buffer so we don't have to worry about concurrent hint bit or
-		 * lsn updates. We assume pd_lower/upper cannot be changed without an
-		 * exclusive lock, so the contents bkp are not racy.
-		 */
-		if (buffer_std)
-		{
-			/* Assume we can omit data between pd_lower and pd_upper */
-			Page		page = BufferGetPage(buffer);
-			uint16		lower = ((PageHeader) page)->pd_lower;
-			uint16		upper = ((PageHeader) page)->pd_upper;
-
-			memcpy(copied_buffer.data, origdata, lower);
-			memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
-		}
-		else
-			memcpy(copied_buffer.data, origdata, BLCKSZ);
+		int			flags = REGBUF_NO_CHANGE;
 
 		XLogBeginInsert();
 
 		if (buffer_std)
 			flags |= REGBUF_STANDARD;
 
-		BufferGetTag(buffer, &rlocator, &forkno, &blkno);
-		XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
+		XLogRegisterBuffer(0, buffer, flags);
 
 		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
 	}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 402bf81b269..097c8fa67c5 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4416,7 +4416,6 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	ErrorContextCallback errcallback;
 	instr_time	io_start;
 	Block		bufBlock;
-	char	   *bufToWrite;
 	uint64		buf_state;
 
 	Assert(BufferLockHeldByMeInMode(buf, BUFFER_LOCK_EXCLUSIVE) ||
@@ -4487,12 +4486,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 */
 	bufBlock = BufHdrGetBlock(buf);
 
-	/*
-	 * Update page checksum if desired.  Since we have only shared lock on the
-	 * buffer, other processes might be updating hint bits in it, so we must
-	 * copy the page to private storage if we do checksumming.
-	 */
-	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
+	/* Update page checksum if desired. */
+	PageSetChecksum((Page) bufBlock, buf->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
@@ -4502,7 +4497,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	smgrwrite(reln,
 			  BufTagGetForkNum(&buf->tag),
 			  buf->tag.blockNum,
-			  bufToWrite,
+			  bufBlock,
 			  false);
 
 	/*
@@ -4626,8 +4621,8 @@ BufferIsPermanent(Buffer buffer)
 /*
  * BufferGetLSNAtomic
  *		Retrieves the LSN of the buffer atomically using a buffer header lock.
- *		This is necessary for some callers who may not have an exclusive lock
- *		on the buffer.
+ *		This is necessary for some callers who may not have a (share-)exclusive
+ *		lock on the buffer.
  */
 XLogRecPtr
 BufferGetLSNAtomic(Buffer buffer)
@@ -5630,6 +5625,12 @@ MarkSharedBufferDirtyHint(Buffer buffer, BufferDesc *bufHdr, uint64 lockstate,
 			 * It's possible we may enter here without an xid, so it is
 			 * essential that CreateCheckPoint waits for virtual transactions
 			 * rather than full transactionids.
+			 *
+			 * FIXME: I think we now should simply mark the page dirty before
+			 * WAL logging the hint bit - afaict it then should work just like
+			 * any other buffer write (due to SyncBuffers()/SyncOneBuffer()
+			 * seeing the dirty bit and trying to lock the page
+			 * share-exclusive, and thus having to wait).
 			 */
 			Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
 			MyProc->delayChkptFlags |= DELAY_CHKPT_START;
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 04a540379a2..55e17e03acb 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -199,7 +199,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 		reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
 						MyProcNumber);
 
-	PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+	PageSetChecksum(localpage, bufHdr->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index de85911e3ac..2072bb1c72c 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1494,51 +1494,15 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 /*
  * Set checksum for a page in shared buffers.
  *
- * If checksums are disabled, or if the page is not initialized, just return
- * the input.  Otherwise, we must make a copy of the page before calculating
- * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
- * from making the final checksum invalid.  It doesn't matter if we include or
- * exclude hints during the copy, as long as we write a valid page and
- * associated checksum.
+ * If checksums are disabled, or if the page is not initialized, just
+ * return. Otherwise compute and set the checksum.
  *
- * Returns a pointer to the block-sized data that needs to be written. Uses
- * statically-allocated memory, so the caller must immediately write the
- * returned page and not refer to it again.
- */
-char *
-PageSetChecksumCopy(Page page, BlockNumber blkno)
-{
-	static char *pageCopy = NULL;
-
-	/* If we don't need a checksum, just return the passed-in data */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
-		return page;
-
-	/*
-	 * We allocate the copy space once and use it over on each subsequent
-	 * call.  The point of palloc'ing here, rather than having a static char
-	 * array, is first to ensure adequate alignment for the checksumming code
-	 * and second to avoid wasting space in processes that never call this.
-	 */
-	if (pageCopy == NULL)
-		pageCopy = MemoryContextAllocAligned(TopMemoryContext,
-											 BLCKSZ,
-											 PG_IO_ALIGN_SIZE,
-											 0);
-
-	memcpy(pageCopy, page, BLCKSZ);
-	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
-	return pageCopy;
-}
-
-/*
- * Set checksum for a page in private memory.
- *
- * This must only be used when we know that no other process can be modifying
- * the page buffer.
+ * In the past this needed to be done on a copy of the page, due to the
+ * possibility of e.g. hint bits being set concurrently. However, this is not
+ * necessary anymore as hint bits won't be set while IO is going on.
  */
 void
-PageSetChecksumInplace(Page page, BlockNumber blkno)
+PageSetChecksum(Page page, BlockNumber blkno)
 {
 	/* If we don't need a checksum, just return */
 	if (PageIsNew(page) || !DataChecksumsEnabled())
diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 36b28824ec8..f3c24082a69 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -279,7 +279,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		BlockNumber blkno = pending_writes[i].blkno;
 		Page		page = pending_writes[i].buf->data;
 
-		PageSetChecksumInplace(page, blkno);
+		PageSetChecksum(page, blkno);
 
 		if (blkno >= bulkstate->relsize)
 		{
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..2ae4a559fab 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -288,7 +288,7 @@ modify_rel_block(PG_FUNCTION_ARGS)
 	}
 	else
 	{
-		PageSetChecksumInplace(page, blkno);
+		PageSetChecksum(page, blkno);
 	}
 
 	smgrwrite(RelationGetSmgr(rel),
-- 
2.48.1.76.g4e746b1a31.dirty



  [text/x-diff] v11-0007-WIP-Make-UnlockReleaseBuffer-more-efficient.patch (3.5K, 8-v11-0007-WIP-Make-UnlockReleaseBuffer-more-efficient.patch)
  download | inline diff:
From 7f263a752faf5017cceb98286c248dbb395b281c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v11 7/7] WIP: Make UnlockReleaseBuffer() more efficient

Now that the buffer content lock is implemented as part of BufferDesc.state,
releasing the lock and unpinning the buffer can be implemented as a single
atomic operation.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/nbtree/nbtpage.c | 22 +++++++++++-
 src/backend/storage/buffer/bufmgr.c | 52 ++++++++++++++++++++++++++++-
 2 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 4125c185e8b..f3e3f67e1fd 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1007,11 +1007,18 @@ _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
 
 	Assert(BlockNumberIsValid(blkno));
 	if (BufferIsValid(obuf))
+	{
+		_bt_relbuf(rel, obuf);
+#if 0
+		Assert(BufferGetBlockNumber(obuf) != blkno);
 		_bt_unlockbuf(rel, obuf);
-	buf = ReleaseAndReadBuffer(obuf, rel, blkno);
+#endif
+	}
+	buf = ReadBuffer(rel, blkno);
 	_bt_lockbuf(rel, buf, access);
 
 	_bt_checkpage(rel, buf);
+
 	return buf;
 }
 
@@ -1023,8 +1030,21 @@ _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
 void
 _bt_relbuf(Relation rel, Buffer buf)
 {
+#if 0
 	_bt_unlockbuf(rel, buf);
 	ReleaseBuffer(buf);
+#else
+	/*
+	 * Buffer is pinned and locked, which means that it is expected to be
+	 * defined and addressable.  Check that proactively.
+	 */
+	VALGRIND_CHECK_MEM_IS_DEFINED(BufferGetPage(buf), BLCKSZ);
+
+	UnlockReleaseBuffer(buf);
+
+	if (!RelationUsesLocalBuffers(rel))
+		VALGRIND_MAKE_MEM_NOACCESS(BufferGetPage(buf), BLCKSZ);
+#endif
 }
 
 /*
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 097c8fa67c5..a3a11595b5d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5506,13 +5506,63 @@ ReleaseBuffer(Buffer buffer)
 /*
  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
  *
- * This is just a shorthand for a common combination.
+ * This is just a, more efficient, shorthand for a common combination.
  */
 void
 UnlockReleaseBuffer(Buffer buffer)
 {
+#if 1
+	int			mode;
+	BufferDesc *buf;
+	PrivateRefCountEntry *ref;
+	uint64		sub;
+	uint64		lockstate;
+
+	if (!BufferIsValid(buffer))
+		elog(ERROR, "bad buffer ID: %d", buffer);
+
+	if (BufferIsLocal(buffer))
+	{
+		UnpinLocalBuffer(buffer);
+		return;
+	}
+
+	ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
+
+	buf = GetBufferDescriptor(buffer - 1);
+
+	mode = BufferLockDisownInternal(buffer, buf);
+
+	/* compute state modification for lock release */
+	sub = BufferLockReleaseSub(mode);
+
+	/* compute state modification for pin release */
+	ref = GetPrivateRefCountEntry(buffer, false);
+	Assert(ref != NULL);
+	Assert(ref->data.refcount > 0);
+	ref->data.refcount--;
+
+	if (ref->data.refcount == 0)
+	{
+		sub |= BUF_REFCOUNT_ONE;
+		ForgetPrivateRefCountEntry(ref);
+	}
+
+	/* perform the lock and pin release in one atomic op */
+	lockstate = pg_atomic_sub_fetch_u64(&buf->state, sub);
+
+	/* wake up waiters etc */
+	BufferLockProcessRelease(buf, mode, lockstate);
+
+	if (lockstate & BM_PIN_COUNT_WAITER)
+		WakePinCountWaiter(buf);
+
+	RESUME_INTERRUPTS();
+
+#else
 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 	ReleaseBuffer(buffer);
+#endif
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty



view thread (35+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Buffer locking is special (hints, checksums, AIO writes)
  In-Reply-To: <k5j77f3q6ztihnjnx2nqxzyor6fbj2qxcbhzuxhkh2yy63jyfg@p72phigar3n4>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox