public inbox for [email protected]
help / color / mirror / Atom feedFrom: Jacob Champion <[email protected]>
To: Thomas Munro <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: Daniel Gustafsson <[email protected]>
Cc: Peter Eisentraut <[email protected]>
Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
Date: Wed, 6 Aug 2025 16:55:44 -0700
Message-ID: <CAOYmi+mvk8Y7btYJhBzOGiNTY3cCpYZKjhA4-TP2Lkb=zOr4oQ@mail.gmail.com> (raw)
In-Reply-To: <CAOYmi+n1xRNCDnwZzigXVk8V=+sr7ZzuGpJ0tAyozX-zQT19Gg@mail.gmail.com>
References: <CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com>
<CAOYmi+mRhhzGUvrcdickepAnsdaGbqhNcboNe4-YvgtkSzGNbQ@mail.gmail.com>
<CA+hUKGLyS-GK_rkENaVbFVTa4VJ+reJxWtt+q4gmgNUXhptfYA@mail.gmail.com>
<CAOYmi+k1q3feeMxfbaJA=+hx+XnOFQA0z2JU+0igA7fTUZTmoA@mail.gmail.com>
<CA+hUKG+H1gwDh96jn5jB6Q3HyXrSC9x2y=uQJAthT8NLs6GN_Q@mail.gmail.com>
<CAOYmi+n1xRNCDnwZzigXVk8V=+sr7ZzuGpJ0tAyozX-zQT19Gg@mail.gmail.com>
On Wed, Aug 6, 2025 at 9:13 AM Jacob Champion
<[email protected]> wrote:
> Maybe "drain" would no longer be the
> verb to use there.
I keep describing this as "combing" the queue when I talk about it in
person, so v3-0001 renames this new operation to comb_multiplexer().
And the CI (plus the more strenuous TLS tests) confirms that the
callback count is still stable with this weaker guarantee, so I've
gotten rid of the event-counting code.
Now that I'm no longer counting events, I can collapse the changes to
register_socket(). I can't revert those changes entirely, because then
we regress the case where Curl switches a socket from IN to OUT (this
is enforced by the new unit tests). But I'm not sure that the existing
comment adequately explained that fix anyway, and I didn't remember to
call it out in my initial email, so I've split it out into v3-0002.
It's much smaller.
The tests (now in 0005) have been adjusted for the new "combing"
behavior, and I've added a case to ensure that multiple stale events
are swept up by a single call to comb_multiplexer().
Thanks!
--Jacob
1: 379c12b5d26 < -: ----------- oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 1: c5cdccfe374 oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 2: 7725e0c173b oauth: Ensure unused socket registrations are removed
2: f30317d7265 ! 3: 6ccf7a5d156 oauth: Remove expired timers from the multiplexer
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
-+ * This can't be combined with the drain_socket_events()
-+ * call below: we might accidentally clear a short timeout
-+ * that was both set and expired during the call to
++ * This can't be combined with the comb_multiplexer() call
++ * below: we might accidentally clear a short timeout that
++ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
3: d243d28964d ! 4: 2be993b8f07 oauth: Track total call count during a client flow
@@ Metadata
## Commit message ##
oauth: Track total call count during a client flow
- Tracking down the bugs that led to the addition of drain_socket_events()
+ Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
@@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
+ };
- #if defined(HAVE_SYS_EVENT_H)
- int nevents; /* how many events are we waiting on? */
+ /*
@@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
-+ * To assist with finding bugs in drain_socket_events() and
++ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
4: ca6fd237653 ! 5: 50257bf32eb oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
-+ * event is drained.
++ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ ssize_t written;
+
+ /*
-+ * Fill the pipe. Once the old writable event is drained, the mux
++ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
++ bool expired;
++
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
-+ * Order matters to avoid false negatives. First drain the socket,
-+ * then unset the timer. We're trying to catch the case where the
-+ * pending timer expiration event takes the place of one of the
-+ * socket events we're attempting to drain.
++ * Order matters, since comb_multiplexer() doesn't have to remove
++ * stale events when active events exist. Follow the call sequence
++ * used in the code: drain the timer expiration, drain the pipe,
++ * then clear the stale events.
+ */
++ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
-+ Assert(set_timer(actx, -1));
++ Assert(comb_multiplexer(actx));
+
++ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
++
++ /* Ensure comb_multiplexer() can handle multiple stale events. */
++ {
++ int rfd2,
++ wfd2;
++
++ /* Create a second local pipe. */
++ Assert(pipe(pipefd) == 0);
++ rfd2 = pipefd[0];
++ wfd2 = pipefd[1];
++
++ /* Make both rfds appear unidirectional if necessary. */
++ if (bidirectional)
++ {
++ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
++ Assert(fill_pipe(rfd2) == bidi_pipe_size);
++ }
++
++ /* Register for read events on both fds, and make them readable. */
++ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
++
++ Assert(write(wfd, "x", 1) == 1);
++ Assert(write(wfd2, "x", 1) == 1);
++
++ mux_is_ready(actx->mux, deadline, "when two fds are readable");
++
++ /*
++ * Drain both fds. comb_multiplexer() should then ensure that the
++ * mux is no longer readable.
++ */
++ Assert(drain_pipe(rfd, 1));
++ Assert(drain_pipe(rfd2, 1));
++ Assert(comb_multiplexer(actx));
++ mux_is_not_ready(actx->mux, "when two fds are drained");
++
++ /* Stop listening. */
++ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
++
++ /* Undo any unidirectional emulation. */
++ if (bidirectional)
++ {
++ Assert(drain_pipe(wfd, bidi_pipe_size));
++ Assert(drain_pipe(wfd2, bidi_pipe_size));
++ }
++
++ close(rfd2);
++ close(wfd2);
++ }
+ }
+
+ close(rfd);
Attachments:
[text/plain] since-v2.diff.txt (7.0K, 2-since-v2.diff.txt)
download | inline:
1: 379c12b5d26 < -: ----------- oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 1: c5cdccfe374 oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 2: 7725e0c173b oauth: Ensure unused socket registrations are removed
2: f30317d7265 ! 3: 6ccf7a5d156 oauth: Remove expired timers from the multiplexer
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
-+ * This can't be combined with the drain_socket_events()
-+ * call below: we might accidentally clear a short timeout
-+ * that was both set and expired during the call to
++ * This can't be combined with the comb_multiplexer() call
++ * below: we might accidentally clear a short timeout that
++ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
3: d243d28964d ! 4: 2be993b8f07 oauth: Track total call count during a client flow
@@ Metadata
## Commit message ##
oauth: Track total call count during a client flow
- Tracking down the bugs that led to the addition of drain_socket_events()
+ Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
@@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
+ };
- #if defined(HAVE_SYS_EVENT_H)
- int nevents; /* how many events are we waiting on? */
+ /*
@@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
-+ * To assist with finding bugs in drain_socket_events() and
++ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
4: ca6fd237653 ! 5: 50257bf32eb oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
-+ * event is drained.
++ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ ssize_t written;
+
+ /*
-+ * Fill the pipe. Once the old writable event is drained, the mux
++ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
++ bool expired;
++
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
-+ * Order matters to avoid false negatives. First drain the socket,
-+ * then unset the timer. We're trying to catch the case where the
-+ * pending timer expiration event takes the place of one of the
-+ * socket events we're attempting to drain.
++ * Order matters, since comb_multiplexer() doesn't have to remove
++ * stale events when active events exist. Follow the call sequence
++ * used in the code: drain the timer expiration, drain the pipe,
++ * then clear the stale events.
+ */
++ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
-+ Assert(set_timer(actx, -1));
++ Assert(comb_multiplexer(actx));
+
++ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
++
++ /* Ensure comb_multiplexer() can handle multiple stale events. */
++ {
++ int rfd2,
++ wfd2;
++
++ /* Create a second local pipe. */
++ Assert(pipe(pipefd) == 0);
++ rfd2 = pipefd[0];
++ wfd2 = pipefd[1];
++
++ /* Make both rfds appear unidirectional if necessary. */
++ if (bidirectional)
++ {
++ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
++ Assert(fill_pipe(rfd2) == bidi_pipe_size);
++ }
++
++ /* Register for read events on both fds, and make them readable. */
++ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
++
++ Assert(write(wfd, "x", 1) == 1);
++ Assert(write(wfd2, "x", 1) == 1);
++
++ mux_is_ready(actx->mux, deadline, "when two fds are readable");
++
++ /*
++ * Drain both fds. comb_multiplexer() should then ensure that the
++ * mux is no longer readable.
++ */
++ Assert(drain_pipe(rfd, 1));
++ Assert(drain_pipe(rfd2, 1));
++ Assert(comb_multiplexer(actx));
++ mux_is_not_ready(actx->mux, "when two fds are drained");
++
++ /* Stop listening. */
++ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
++
++ /* Undo any unidirectional emulation. */
++ if (bidirectional)
++ {
++ Assert(drain_pipe(wfd, bidi_pipe_size));
++ Assert(drain_pipe(wfd2, bidi_pipe_size));
++ }
++
++ close(rfd2);
++ close(wfd2);
++ }
+ }
+
+ close(rfd);
[application/octet-stream] v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch (4.6K, 3-v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch)
download | inline diff:
From c5cdccfe374ff9d45219f705511785318833f6eb Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v3 1/5] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement comb_multiplexer() to call kevent() and unstick any stale
events that would cause unnecessary callbacks. This is called right
after drive_request(), before we return control to the client to wait.
Suggested-by: Thomas Munro <[email protected]>
---
src/interfaces/libpq-oauth/oauth-curl.c | 74 +++++++++++++++++++++++--
1 file changed, 68 insertions(+), 6 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..3380a17628e 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1376,6 +1376,60 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
#endif
}
+/*-------
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
+ * As a motivating example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ * client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ * and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ * The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't hold onto stale events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent ev;
+
+ /*
+ * Try to read a single pending event. We can actually ignore the result:
+ * either we found an event to process, in which case the multiplexer is
+ * correctly readable for that event at minimum, and it doesn't matter if
+ * there are any stale events; or we didn't find any, in which case the
+ * kernel will have discarded any stale events as it traveled to the end
+ * of the queue.
+ *
+ * Note that this depends on our registrations being level-triggered --
+ * even the timer, so we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux. If we used edge-triggered events,
+ * this call would improperly discard them.
+ */
+ if (kevent(actx->mux, NULL, 0, &ev, 1, &timeout) < 0)
+ {
+ actx_error(actx, "could not comb kqueue: %m");
+ return false;
+ }
+
+ return true;
+#else
+#error comb_multiplexer is not implemented on this platform
+#endif
+}
+
/*
* Enables or disables the timer in the multiplexer set. The timeout value is
* in milliseconds (negative values disable the timer).
@@ -2755,13 +2809,21 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
- break;
+ /*
+ * This request is still running.
+ *
+ * Make sure that stale events don't cause us to come back
+ * early. (Currently, this can occur only with kqueue.) If
+ * this is forgotten, the multiplexer can get stuck in a
+ * signalled state and we'll burn CPU cycles pointlessly.
+ */
+ if (!comb_multiplexer(actx))
+ goto error_return;
+
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
[application/octet-stream] v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch (3.0K, 4-v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch)
download | inline diff:
From 7725e0c173bb42c1511e780b93d637a9b787904e Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 6 Aug 2025 15:18:08 -0700
Subject: [PATCH v3 2/5] oauth: Ensure unused socket registrations are removed
If Curl needs to switch the direction of a socket's registration (e.g.
from CURL_POLL_IN to CURL_POLL_OUT), it expects the old registration to
be discarded. For epoll, this happened via EPOLL_CTL_MOD, but for
kqueue, the old registration would remain if it was not explicitly
removed by Curl.
Explicitly remove the opposite-direction event during registrations. (If
that event doesn't exist, we'll just get an ENOENT, which will be
ignored by the same code that handles CURL_POLL_REMOVE.) A few
assertions are also added to strengthen the relationship between the
number of events added, the number of events pulled off the queue, and
the lengths of the kevent arrays.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 3380a17628e..7a87a96d2ed 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1291,22 +1291,31 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
int nev = 0;
int res;
+ /*
+ * We don't know which of the events is currently registered, perhaps
+ * both, so we always try to remove unneeded events. This means we need to
+ * tolerate ENOENT below.
+ */
switch (what)
{
case CURL_POLL_IN:
EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_OUT:
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_INOUT:
@@ -1317,12 +1326,6 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
break;
case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
nev++;
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
@@ -1334,7 +1337,10 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
--
2.34.1
[application/octet-stream] v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch (5.6K, 5-v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch)
download | inline diff:
From 6ccf7a5d156975f8f4bf29a82cf3ef2f20d76853 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v3 3/5] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 7a87a96d2ed..1c354174083 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1543,40 +1543,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1608,6 +1588,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2811,6 +2821,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the comb_multiplexer() call
+ * below: we might accidentally clear a short timeout that
+ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2833,24 +2859,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
[application/octet-stream] v3-0004-oauth-Track-total-call-count-during-a-client-flow.patch (4.7K, 6-v3-0004-oauth-Track-total-call-count-during-a-client-flow.patch)
download | inline diff:
From 2be993b8f07e3bcb257f2bb8b579113985d4214c Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v3 4/5] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 1c354174083..41cd3bf9691 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
};
/*
@@ -3028,6 +3029,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3056,6 +3059,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
[application/octet-stream] v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch (18.7K, 7-v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch)
download | inline diff:
From 50257bf32eb2b0972e5139ac4a79367372c77385 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v3 5/5] oauth: Add unit tests for multiplexer handling
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 527 +++++++++++++++++++
4 files changed, 600 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 682f17413b3..e73573694b9 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..e769856c2c9
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..8263aff2f4a
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,527 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ bool expired;
+
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters, since comb_multiplexer() doesn't have to remove
+ * stale events when active events exist. Follow the call sequence
+ * used in the code: drain the timer expiration, drain the pipe,
+ * then clear the stale events.
+ */
+ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+
+ /* Ensure comb_multiplexer() can handle multiple stale events. */
+ {
+ int rfd2,
+ wfd2;
+
+ /* Create a second local pipe. */
+ Assert(pipe(pipefd) == 0);
+ rfd2 = pipefd[0];
+ wfd2 = pipefd[1];
+
+ /* Make both rfds appear unidirectional if necessary. */
+ if (bidirectional)
+ {
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+ Assert(fill_pipe(rfd2) == bidi_pipe_size);
+ }
+
+ /* Register for read events on both fds, and make them readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
+
+ Assert(write(wfd, "x", 1) == 1);
+ Assert(write(wfd2, "x", 1) == 1);
+
+ mux_is_ready(actx->mux, deadline, "when two fds are readable");
+
+ /*
+ * Drain both fds. comb_multiplexer() should then ensure that the
+ * mux is no longer readable.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_pipe(rfd2, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when two fds are drained");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ {
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ Assert(drain_pipe(wfd2, bidi_pipe_size));
+ }
+
+ close(rfd2);
+ close(wfd2);
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
view thread (25+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
In-Reply-To: <CAOYmi+mvk8Y7btYJhBzOGiNTY3cCpYZKjhA4-TP2Lkb=zOr4oQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox