public inbox for [email protected]
help / color / mirror / Atom feedFrom: Chao Li <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Subject: Re: PoC: Add condition variable support to WaitEventSetWait()
Date: Thu, 2 Apr 2026 15:38:48 +0800
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>
>
>
>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>
>> Hi,
>>
>> There is an XXX comment in WalSndWait():
>> ```
>> * XXX: A desirable future improvement would be to add support for CVs
>> * into WaitEventSetWait().
>> ```
>>
>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>
>> I’d like to highlight a few key points about the design:
>>
>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>
>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>
>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>
>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>
>> Here is the v1 patch.
>>
>> Best regards,
>> --
>> Chao Li (Evan)
>> HighGo Software Co., Ltd.
>> https://www.highgo.com/
>>
>>
>>
>>
>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>
>
> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>
> Best regards,
> --
> Chao Li (Evan)
> HighGo Software Co., Ltd.
> https://www.highgo.com/
>
>
>
>
> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
PFA v2 - fixed a CI failure from contrib/postgres_fdw.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch (23.4K, 2-v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch)
download | inline diff:
From 8e30e9c9d111942dc232617392836727da292197 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Tue, 31 Mar 2026 08:25:12 +0800
Subject: [PATCH v2] Add condition variable support to WaitEventSetWait()
WalSndWait() currently combines WaitEventSetWait() with manual
ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep()
calls.
This patch teaches WaitEventSetWait() to cooperate with condition
variables directly. A wait-event set can now include a condition
variable event, and WaitEventSetWait() takes care of preparing to
sleep on the condition variable before blocking and canceling the
sleep afterwards.
With that in place, WalSndWait() no longer needs to manage the
condition-variable sleep state manually, and can express the wakeup
source through the wait-event set itself.
Author: Chao Li <[email protected]>
---
contrib/postgres_fdw/postgres_fdw.c | 2 +-
src/backend/executor/execAsync.c | 2 +-
src/backend/executor/nodeAppend.c | 4 +-
src/backend/libpq/be-secure.c | 4 +-
src/backend/libpq/pqcomm.c | 12 +-
src/backend/postmaster/postmaster.c | 4 +-
src/backend/postmaster/syslogger.c | 4 +-
src/backend/replication/walsender.c | 17 ++-
src/backend/storage/ipc/latch.c | 16 +--
src/backend/storage/ipc/waiteventset.c | 114 +++++++++++++++++-
src/backend/storage/lmgr/condition_variable.c | 33 +++--
src/backend/utils/init/miscinit.c | 4 +-
src/include/libpq/libpq.h | 3 +-
src/include/storage/condition_variable.h | 1 +
src/include/storage/waiteventset.h | 7 +-
15 files changed, 179 insertions(+), 48 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 41e47cc795b..5895c59a5c5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7277,7 +7277,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
Assert(pendingAreq == areq);
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
- NULL, areq);
+ NULL, NULL, areq);
}
/*
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index cf7ddbb01f4..e048328f583 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -57,7 +57,7 @@ ExecAsyncRequest(AsyncRequest *areq)
* for which it wishes to wait. We expect the node-type specific callback to
* make a single call of the following form:
*
- * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, NULL, areq);
*/
void
ExecAsyncConfigureWait(AsyncRequest *areq)
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..27895b96b50 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node)
Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
@@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* extensions too.
*/
AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
if (nevents > EVENT_BUFFER_SIZE)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 617704bb993..0a406a050d9 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -217,7 +217,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
@@ -342,7 +342,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4a442f22df6..5d3a5c22db0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock)
Port *port;
int socket_pos PG_USED_FOR_ASSERTS_ONLY;
int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int cv_pos PG_USED_FOR_ASSERTS_ONLY;
/* allocate the Port struct and copy the ClientSocket contents to it */
port = palloc0_object(Port);
@@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock)
FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
- port->sock, NULL, NULL);
+ port->sock, NULL, NULL, NULL);
latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
+ cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET,
+ NULL, NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/*
* The event positions match the order we added them, but let's sanity
@@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock)
*/
Assert(socket_pos == FeBeWaitSetSocketPos);
Assert(latch_pos == FeBeWaitSetLatchPos);
+ Assert(cv_pos == FeBeWaitSetCVPos);
return port;
}
@@ -2063,7 +2067,7 @@ pq_check_connection(void)
* It's OK to modify the socket event filter without restoring, because
* all FeBeWaitSet socket wait sites do the same.
*/
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL);
retry:
rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index abf0c97569e..f96cce8730d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1638,13 +1638,13 @@ ConfigurePostmasterWaitSet(bool accept_connections)
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
- NULL);
+ NULL, NULL);
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
- NULL, NULL);
+ NULL, NULL, NULL);
}
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 0c2a7bc8578..4db7df8e4a5 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len)
* (including the postmaster).
*/
wes = CreateWaitEventSet(NULL, 2);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL);
#ifndef WIN32
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
+ AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL);
#endif
/* main worker loop */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66507e9c2dd..d2749a2ca67 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3846,8 +3846,9 @@ static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
+ ConditionVariable *cv = NULL;
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL);
/*
* We use a condition variable to efficiently wake up walsenders in
@@ -3867,9 +3868,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* release for every iteration, just to wake up only the waiting
* walsenders. It makes WalSndWakeup() callers' life easy.
*
- * XXX: A desirable future improvement would be to add support for CVs
- * into WaitEventSetWait().
- *
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
*
@@ -3878,20 +3876,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* the receipt of the LSN.
*/
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ cv = &WalSndCtl->wal_confirm_rcv_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ cv = &WalSndCtl->wal_flush_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+ cv = &WalSndCtl->wal_replay_cv;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
- ConditionVariableCancelSleep();
proc_exit(1);
}
-
- ConditionVariableCancelSleep();
}
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 8537e9fef2d..9b148d0f52b 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -41,7 +41,7 @@ InitializeLatchWaitSet(void)
/* Set up the WaitEventSet used by WaitLatch(). */
LatchWaitSet = CreateWaitEventSet(NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
Assert(latch_pos == LatchWaitSetLatchPos);
/*
@@ -51,7 +51,7 @@ InitializeLatchWaitSet(void)
if (IsUnderPostmaster)
{
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
- PGINVALID_SOCKET, NULL, NULL);
+ PGINVALID_SOCKET, NULL, NULL, NULL);
Assert(latch_pos == LatchWaitSetPostmasterDeathPos);
}
}
@@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
*/
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL);
if (IsUnderPostmaster)
ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
(wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+ NULL, NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
@@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
- latch, NULL);
+ latch, NULL, NULL);
/* Postmaster-managed callers must handle postmaster death somehow. */
Assert(!IsUnderPostmaster ||
@@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
- AddWaitEventToSet(set, ev, sock, NULL, NULL);
+ AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c
index 0f228e1e7b8..a685c93a5ad 100644
--- a/src/backend/storage/ipc/waiteventset.c
+++ b/src/backend/storage/ipc/waiteventset.c
@@ -73,6 +73,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/waiteventset.h"
#include "utils/memutils.h"
@@ -137,6 +138,15 @@ struct WaitEventSet
Latch *latch;
int latch_pos;
+ /*
+ * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a
+ * pointer to said condition variable, and cv_pos the offset in the
+ * ->events array.
+ */
+ ConditionVariable *cv;
+ int cv_pos;
+ bool cv_maybe_signaled; /* Decide if check signaled after poll */
+
/*
* WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
* is set so that we'll exit immediately if postmaster death is detected,
@@ -414,6 +424,8 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents)
#endif
set->latch = NULL;
+ set->cv = NULL;
+ set->cv_maybe_signaled = false;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
@@ -550,6 +562,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
* platforms, this is the same as WL_SOCKET_READABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
+ * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled.
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent().
@@ -568,7 +581,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
*/
int
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
- void *user_data)
+ ConditionVariable *cv, void *user_data)
{
WaitEvent *event;
@@ -596,6 +609,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
elog(ERROR, "cannot wait on latch without a specified latch");
}
+ /*
+ * When add a CV event, we allow a NULL cv, because it can be set later
+ * with ModifyWaitEvent.
+ */
+ if (cv)
+ {
+ if (set->cv)
+ elog(ERROR, "cannot wait on more than one condition variable");
+ if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE)
+ elog(ERROR, "condition variable events only support being set");
+ }
+
/* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
@@ -624,6 +649,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#endif
#endif
}
+ else if (events == WL_CONDITION_VARIABLE)
+ {
+ set->cv = cv;
+ set->cv_pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ return event->pos;
+ }
else if (events == WL_POSTMASTER_DEATH)
{
#ifndef WIN32
@@ -653,14 +685,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
* 'pos' is the id returned by AddWaitEventToSet.
*/
void
-ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch,
+ ConditionVariable *cv)
{
WaitEvent *event;
#if defined(WAIT_USE_KQUEUE)
int old_events;
#endif
- Assert(pos < set->nevents);
+ Assert(pos >= 0 && pos < set->nevents);
event = &set->events[pos];
#if defined(WAIT_USE_KQUEUE)
@@ -682,6 +715,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
return;
}
+ /*
+ * Change the condition variable associated with the event. As CV doesn't
+ * rely on kernal, go ahead with a fast-path and only update the CV
+ * pointer.
+ */
+ if (event->events == WL_CONDITION_VARIABLE)
+ {
+ if (events != WL_CONDITION_VARIABLE)
+ elog(ERROR, "cannot change event type of a condition variable event");
+ if (cv == NULL)
+ elog(ERROR, "cannot set condition variable to NULL");
+ set->cv = cv;
+ return;
+ }
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -1061,6 +1109,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
else
INSTR_TIME_SET_ZERO(start_time);
+ if (set->cv != NULL)
+ ConditionVariablePrepareToSleep(set->cv);
+
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
@@ -1072,6 +1123,7 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
while (returned_events == 0)
{
int rc;
+ bool cv_signaled = false;
/*
* Check if the latch is set already first. If so, we either exit
@@ -1133,6 +1185,37 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
timeout = 0;
}
+ if (set->cv)
+ cv_signaled = ConditionVariableSignaled(set->cv);
+
+ if (set->cv && !cv_signaled)
+ set->cv_maybe_signaled = true;
+
+ if (set->cv && cv_signaled)
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ /* could have been set above */
+ set->cv_maybe_signaled = false;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+
+ /*
+ * Even though we already have an event, we'll poll just once with
+ * zero timeout to see what non-latch events we can fit into the
+ * output buffer at the same time.
+ */
+ cur_timeout = 0;
+ timeout = 0;
+ }
+
/*
* Wait for events using the readiness primitive chosen at the top of
* this file. If -1 is returned, a timeout has occurred, if 0 we have
@@ -1145,11 +1228,32 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
set->latch->maybe_sleeping)
set->latch->maybe_sleeping = false;
+ if (set->cv && set->cv_maybe_signaled)
+ set->cv_maybe_signaled = false;
+
if (rc == -1)
break; /* timeout occurred */
else
+ {
returned_events += rc;
+ /* Check CV again after waiting if not done before waiting */
+ if (set->cv && set->cv_maybe_signaled && returned_events < nevents &&
+ ConditionVariableSignaled(set->cv))
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+ }
+ }
+
/* If we're not done, update cur_timeout for next iteration */
if (returned_events == 0 && timeout >= 0)
{
@@ -1166,10 +1270,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
pgstat_report_wait_end();
+ if (set->cv != NULL)
+ ConditionVariableCancelSleep();
+
return returned_events;
}
-
#if defined(WAIT_USE_EPOLL)
/*
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 1f16b3f7475..c04bc420bcf 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
* by something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid this obvious case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
- {
- done = true;
- proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
- }
- SpinLockRelease(&cv->mutex);
+ done = ConditionVariableSignaled(cv);
/*
* Check for interrupts, and return spuriously if that caused the
@@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
}
}
+/*
+ * Check whether this process was removed from cv's wait list by a CV
+ * signal/broadcast. If so, re-add it to preserve wakeups while the caller
+ * checks the predicate.
+ */
+bool
+ConditionVariableSignaled(ConditionVariable *cv)
+{
+ bool signaled = false;
+
+ /* Ignore probes for CVs we are not currently prepared to sleep on. */
+ if (cv_sleep_target != cv)
+ return false;
+
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
+ {
+ signaled = true;
+ proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+
+ return signaled;
+}
+
/*
* Cancel any pending sleep operation.
*
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index ba191977697..b970c1d727c 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,7 +222,7 @@ SwitchToSharedLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
/*
* Set the shared latch as the local one might have been set. This
@@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
SetLatch(MyLatch);
}
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index c9b934d2321..8d13eaf8393 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
#define FeBeWaitSetSocketPos 0
#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetCVPos 2
+#define FeBeWaitSetNEvents 4
extern int ListenServerPort(int family, const char *hostName,
unsigned short portNumber, const char *unixSocketDir,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 14bd6dd55c0..fb7e4bb67f2 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
extern bool ConditionVariableCancelSleep(void);
+extern bool ConditionVariableSignaled(ConditionVariable *cv);
/*
* Optionally, ConditionVariablePrepareToSleep can be called before entering
diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h
index 5341267f0a0..f3d569a6aa0 100644
--- a/src/include/storage/waiteventset.h
+++ b/src/include/storage/waiteventset.h
@@ -25,6 +25,7 @@
#ifndef WAITEVENTSET_H
#define WAITEVENTSET_H
+#include "storage/condition_variable.h"
#include "utils/resowner.h"
/*
@@ -50,6 +51,7 @@
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_ACCEPT WL_SOCKET_READABLE
#endif
+#define WL_CONDITION_VARIABLE (1 << 9)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED | \
@@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern void FreeWaitEventSetAfterFork(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
- struct Latch *latch, void *user_data);
+ struct Latch *latch, ConditionVariable *cv,
+ void *user_data);
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
- struct Latch *latch);
+ struct Latch *latch, ConditionVariable *cv);
extern int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info);
--
2.50.1 (Apple Git-155)
view thread (10+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: PoC: Add condition variable support to WaitEventSetWait()
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox