commit c7daefa51118d2041623b14b7f26c9177ac0b6cd Author: Chao Li (Evan) Date: Sat Oct 25 15:42:26 2025 +0800 fix race condition diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 6e8d728e9ce..3c8a640ebed 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -250,6 +250,11 @@ typedef struct QueuePosition #define QUEUE_POS_EQUAL(x,y) \ ((x).page == (y).page && (x).offset == (y).offset) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + #define QUEUE_POS_IS_ZERO(x) \ ((x).page == 0 && (x).offset == 0) @@ -287,7 +292,9 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* advisory position for this backend */ bool wakeupPending; /* signal sent but not yet processed */ + bool awakening; /* backend is awakening */ } QueueBackendStatus; /* @@ -348,7 +355,9 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) +#define QUEUE_BACKEND_AWAKENING(i) (asyncQueueControl->backend[i].awakening) /* * The SLRU buffer area through which we access the notification queue @@ -675,7 +684,9 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; + QUEUE_BACKEND_AWAKENING(i) = false; } } @@ -1954,6 +1965,7 @@ SignalBackends(void) ProcNumber *procnos; int count; ListCell *lc; + List *interestedProcs = NIL; INJECTION_POINT("listen-notify-signal-backends", NULL); @@ -2002,6 +2014,12 @@ SignalBackends(void) int32 pid; QueuePosition pos; + // XXX: Use a list to record listeners interested in any of the pending channels. + // List is not the best choice, so it we decide to take this apprach, we + // can optimize it later by using a hash or bitmap. + if (!list_member_int(interestedProcs, i)) + interestedProcs = lappend_int(interestedProcs, i); + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; @@ -2054,19 +2072,41 @@ SignalBackends(void) int64 lag; int32 pid; - if (QUEUE_BACKEND_WAKEUP_PENDING(i)) - continue; + /* XXX we cannot rely on wakeupPending here, because the flag might be set by another notifier. */ + //if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + // continue; pos = QUEUE_BACKEND_POS(i); - /* Direct advancement for idle backends at the old head */ - if (pendingNotifies != NULL && - QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + if (!list_member_int(interestedProcs, i)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - pos = queueHeadAfterWrite; + /* Direct advancement for idle backends at the old head */ + if (pendingNotifies != NULL) + { + if (QUEUE_BACKEND_AWAKENING(i)) + { + // For awakening backend, advice a new position. + if ((QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) || + (QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos) && QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))) && + QUEUE_POS_EQUAL(QUEUE_BACKEND_ADVISORY_POS(i), queueHeadBeforeWrite)) + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; + } + else + { + // For non-awakening backend, directly advance its position. + if (QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + { + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); + } + } + pos = queueHeadAfterWrite; + } } + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + /* Signal backends that have fallen too far behind */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(pos)); @@ -2321,6 +2361,7 @@ asyncQueueReadAllNotifications(void) /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; + QUEUE_BACKEND_AWAKENING(MyProcNumber) = true; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; LWLockRelease(NotifyQueueLock); @@ -2330,6 +2371,9 @@ asyncQueueReadAllNotifications(void) if (QUEUE_POS_EQUAL(pos, head)) { /* Nothing to do, we have read all notifications already. */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_AWAKENING(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); return; } @@ -2441,21 +2485,29 @@ asyncQueueReadAllNotifications(void) page_buffer.buf, snapshot); - /* - * Update our position in shared memory. The 'pos' variable now - * holds our new position (advanced past all messages we just - * processed). This ensures that if we fail while processing - * messages from the next page, we won't reprocess the ones we - * just handled. It also prevents us from overwriting any direct - * advancement that another backend might have done while we were - * processing messages. - */ - LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; - LWLockRelease(NotifyQueueLock); - + // If there is a direct advancement, let's stop reading. + // We don't need to lock here because even if the position + // changes right after we read it, we just do one more loop. + //LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber))) + { + reachedStop = true; + } + //QUEUE_BACKEND_POS(MyProcNumber) = pos; + //LWLockRelease(NotifyQueueLock); } while (!reachedStop); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber))) + { + /* respect direct advancement */ + pos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + } + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(MyProcNumber), 0, 0); + QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_AWAKENING(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); + /* Done with snapshot */ UnregisterSnapshot(snapshot); } diff --git a/src/test/authentication/t/008_listen-pos-race.pl b/src/test/authentication/t/008_listen-pos-race.pl index 060e33ed391..c858e8da524 100644 --- a/src/test/authentication/t/008_listen-pos-race.pl +++ b/src/test/authentication/t/008_listen-pos-race.pl @@ -8,7 +8,7 @@ use PostgreSQL::Test::Utils; use Time::HiRes qw(usleep); use Test::More; -if ($ENV{enable_injection_points} ne 'yes') { +if ($ENV{enable_injection_points} // '' ne 'yes') { plan skip_all => 'Injection points not supported by this build'; } @@ -18,6 +18,7 @@ $node->start; if (!$node->check_extension('injection_points')) { + $node->stop; plan skip_all => 'Extension injection_points not installed'; }