Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1vCs7y-001Spv-F4 for pgsql-hackers@arkaria.postgresql.org; Sun, 26 Oct 2025 04:13:09 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.94.2) (envelope-from ) id 1vCs6y-004MHu-AZ for pgsql-hackers@arkaria.postgresql.org; Sun, 26 Oct 2025 04:12:07 +0000 Received: from makus.postgresql.org ([2001:4800:3e1:1::229]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1vCs6x-004MHl-Ri for pgsql-hackers@lists.postgresql.org; Sun, 26 Oct 2025 04:12:07 +0000 Received: from mail-pf1-x42f.google.com ([2607:f8b0:4864:20::42f]) by makus.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 (Exim 4.96) (envelope-from ) id 1vCs6u-003na6-05 for pgsql-hackers@postgresql.org; Sun, 26 Oct 2025 04:12:05 +0000 Received: by mail-pf1-x42f.google.com with SMTP id d2e1a72fcca58-781ea2cee3fso3295123b3a.0 for ; Sat, 25 Oct 2025 21:12:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1761451922; x=1762056722; darn=postgresql.org; h=references:to:cc:in-reply-to:date:subject:mime-version:message-id :from:from:to:cc:subject:date:message-id:reply-to; bh=0bN7fYxyrMzass6h/r0s45Lpzm70PPX+o1IuUcr3ysw=; b=AcenbJ2Z3H5EKv64aZJRLctlSZ32eDkzR+7i0i3BToEA/FVsPVOPvbV1d9oWwGGljL /6tNz8l+AN0sAdqBdmcjv0ssObgDfO0UvKL86s6TOmU/0NYZKeo5er4QD01IiLhIlcaW TWo75eMSgizfqiqbZ5DLwCPf2SJhZol33b4dlG8gAop3y3xAcuWDVY0iDsro6spNGNB8 +zT3OascwYUz0xkD2TY2hNpKWFsb1vt6JZ+4kJGCwCaa1heD8LBwi4ODnQmnS8t664KW H1gu1VPOoP0B7IZgc5E4p5T4+NuESGo4BsF/8n3bblzt1uhSf2+hV0NtEIg6nQQKn0V0 Bzkg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1761451922; x=1762056722; h=references:to:cc:in-reply-to:date:subject:mime-version:message-id :from:x-gm-message-state:from:to:cc:subject:date:message-id:reply-to; bh=0bN7fYxyrMzass6h/r0s45Lpzm70PPX+o1IuUcr3ysw=; b=FaOt9MoD/knM9DNtO0XYWHoTj1m+/XqAH4jO9oJqNVJIDWmM1+G0QQuA0G69+Yl7CV 9LAZWOeEtCrs0zRkmdTIyH/iKeUo/MkCzw5u8vKFqqBtK22im2YKDKPcrfo+DmRU2kEe Neh44kp3m1A24vX4woJMiAjJEbJksQml+IQ0DBfqTDsYxHXyLGI3rf9TLZZ9jUl2zBIz q8WrstjGsySERHx/c8nmBeTz3dqVw/ZLV6zD/WQ7CxQiI+Iz0lr3DJHqDNEUiJEwdQw0 7jxvFpmIpmf+paOEz12jcSUehIxR2DxnAKyNi9CgIUBLvHWl21DKOGY2KMXIkKlrnF37 FDww== X-Forwarded-Encrypted: i=1; AJvYcCWEqR6eh61SVdZuC2R86Lor+r45bB6d/W1HTEvv+cVzutVfYedLN32AVZ64A5pW8grrPoYDZletbuPw2KfZ@postgresql.org X-Gm-Message-State: AOJu0YzxAkHCcE8sFIt06ncvmlJafArC5koniMzImQyEOTXBVuHf3vSC NoPWdEjKpj/jbxezJRQ41bHbFlBAM7tqT1jYs/mofx71yXMKsOfiY1pJ X-Gm-Gg: ASbGncsDLjPbv60dEJrYiVZpk1qUP2lk6Bw70ZNU/vrtc7RRXu+HWLjIdgGLMjFQ2RR mBgcV0snamtRtrguBtnGDi+jJTYBM23oajjNbqT4b9UvtoKrflO2jPB3qVK9hfAgwSvC2ppFerW 39Ke0gd27fQipxmJW4WxR6hz56MB6CTj9xiyYZMLkYe4MGjrIlJljkP1CmugXNYqF8OlVdcpQ/Y 5n635T5iHhfmlV+ngJPHigCxO14JNgc9UMMOBkVGoBRA6IYserEIJOcfErMSl6IJbuxSrZ0x3/B wJBittRZ5b2rNZuflhDXr5/b/kL+L6uBNOS+DTtMvcxkzpGqHyupMpTtv0dMDad0oDKFTnrkVdc YLVVIY1YDguedcVeTGYKnOrKro45ZRvzluXMJaPLCazWJLRaPoGFMTTL+0IIyQcvhXKkzVzsDlx hNgHJeIiMjxTt6h3nlqB+sMw== X-Google-Smtp-Source: AGHT+IGhSOGXZ0iBEEsHueDv549aykEoWShPRGM62JOzaQ13rDELYKr84nDTrN6RRrFi3aw4T+PRZg== X-Received: by 2002:a05:6a00:1393:b0:781:16de:cc0c with SMTP id d2e1a72fcca58-7a286827891mr6892047b3a.19.1761451921979; Sat, 25 Oct 2025 21:12:01 -0700 (PDT) Received: from smtpclient.apple ([170.178.170.211]) by smtp.gmail.com with ESMTPSA id d2e1a72fcca58-7a414049c01sm3757307b3a.37.2025.10.25.21.11.59 (version=TLS1_2 cipher=ECDHE-ECDSA-AES128-GCM-SHA256 bits=128/128); Sat, 25 Oct 2025 21:12:01 -0700 (PDT) From: Chao Li Message-Id: <0BCA1C2D-B92C-459E-B1A6-6D06BA4C62CF@gmail.com> Content-Type: multipart/mixed; boundary="Apple-Mail=_7B646548-F559-49EC-8DB9-78AF6DB09FAE" Mime-Version: 1.0 (Mac OS X Mail 16.0 \(3826.700.81\)) Subject: Re: Optimize LISTEN/NOTIFY Date: Sun, 26 Oct 2025 12:11:25 +0800 In-Reply-To: Cc: Joel Jacobson , Tom Lane , pgsql-hackers To: Arseniy Mukhin References: <6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com> <165530.1752362320@sss.pgh.pa.us> <02a7cd37-e2fc-4212-8b19-f8c239c95fb8@app.fastmail.com> <96f00bf1-cc9d-4520-9d02-9e14e7767c88@app.fastmail.com> <30c2aa7d-dd6c-4b68-a2e4-f217a1a34acf@app.fastmail.com> <0b4d402a-9ac2-4aa8-acf8-8231dbe579ea@app.fastmail.com> <3095599.1758644879@sss.pgh.pa.us> <0dc6a2cc-5216-4dc1-9dd2-430cafc6095b@app.fastmail.com> <52CC167F-763B-4ECA-B0B4-DAB381816828@gmail.com> <9186C6D0-F7A9-482A-9183-89E530B57E36@gmail.com> <1073593.1759423179@sss.pgh.pa.us> <4bd5e6c4-6fa7-44bb-869d-59a32a331fa8@app.fastmail.com> <85828f29-e72e-4400-94f3-9a69bc8dc239@app.fastmail.com> <2495353.1759860890@sss.pgh.pa.us> <8aeae418-92a6-4bbd-9c06-9574c79e59f7@app.fastmail.com> <2531672.1759868124@sss.pgh.pa.us> <474efa78-337c-41cd-a73a-f845a0115109@app.fastmail.com> <2749343.1759949176@sss.pgh.pa.us> <8bfca2be-1ec0-4e15-aafb-0b7b661fe936@app.fastmail.com> <9eba307f-f2fb-48f0-9507-2e197f39ef9e@app.fastmail.com> <8c71183a-0d28-4bcf-a806-78446ff95404@app.fastmail.com> <1009807.1760476747@sss.pgh.pa.us> <1F7227F5-C33D-4E2C-8511-33F1468590D0@gmail.com> <0a5a20d3-4621-46b3-b2ab-903f63a20dea@app.fastmail.com> <6F913129-ABEF-4004-AAF3-F22FC34!29AE8@gmail.com> <1547585.1760645808@sss.pgh.pa.us> <14865EB6-0BF4-462B-9072-10BDAC10C052@gmail.com> X-Mailer: Apple Mail (2.3826.700.81) List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk --Apple-Mail=_7B646548-F559-49EC-8DB9-78AF6DB09FAE Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 > On Oct 23, 2025, at 18:02, Arseniy Mukhin = wrote: >=20 > Hi, >=20 > On Thu, Oct 23, 2025 at 11:17=E2=80=AFAM Chao Li = wrote: >>=20 >>=20 >>=20 >>> On Oct 21, 2025, at 00:43, Arseniy Mukhin = wrote: >>>=20 >>>=20 >>> I managed to reproduce the race with v20-alt3. I tried to write a = TAP >>> test reproducing the issue, so it was easier to validate changes. >>> Please find the attached TAP test. I added it to some random package >>> for simplicity. >>>=20 >>=20 >> With alt3, as we have acquired the notification lock after reading = every message to update the POS, I think we can do a little bit more = optimization: >>=20 >> The notifier: in SignalBackend() >> * Now we check if a listener=E2=80=99s pos equals to = beforeWritePos, then we do =E2=80=9Cdirectly advancement=E2=80=9D >> * We can change to if a listener=E2=80=99s pos is between = beforeWritePos and afterWritePos, then we can do the advancement. >>=20 >> The listener: in asyncQueueReadAllNotifications(): >> * With alt3, we only lock and update pos >> * We can do more. If current pos in shared memory is after that = local pos, then meaning some notifier has done an advancement, so it can = stop reading. >>=20 >=20 > I think this would be a reasonable optimization if it weren't for the > race condition mentioned above. The problem is that if the local pos > lags behind the shared memory pos, it could point to a truncated queue > segment, so we shouldn't allow that. >=20 I figured out a way to resolve the race condition for alt3: * add an awakening flag for every listener, this flag is only set by = listeners * add an advisory pos for every listener, similar to alt1 * if a listener is awaken, notify only updates the listener=E2=80=99s = advisory pos; otherwise directly advance its position. * If a running listener see current pos is behind advisory pos, then = stop reading See more details in attach patch file, I added code comments for my = changes. Now the TAP test won=E2=80=99t hit the race condition.=20 ``` # +++ tap check in src/test/authentication +++ t/008_listen-pos-race.pl .. skipped: Injection points not supported by = this build Files=3D1, Tests=3D0, 0 wallclock secs ( 0.00 usr 0.00 sys + 0.03 = cusr 0.01 csys =3D 0.04 CPU) Result: NOTESTS ``` And with my solution, listeners longer will still use local pos, so that = no longer need to acquire notification lock in every loop. The patch stack is: v20 patch -> alt3 patch -> tap patch -> my patch. = Please see if my solution works. I also made a tiny change in the TAP script to allow it to terminate = gracefully. Best regards, -- Chao Li (Evan) HighGo Software Co., Ltd. https://www.highgo.com/ --Apple-Mail=_7B646548-F559-49EC-8DB9-78AF6DB09FAE Content-Disposition: attachment; filename=fix-race.patch Content-Type: application/octet-stream; x-unix-mode=0644; name="fix-race.patch" Content-Transfer-Encoding: 7bit commit c7daefa51118d2041623b14b7f26c9177ac0b6cd Author: Chao Li (Evan) Date: Sat Oct 25 15:42:26 2025 +0800 fix race condition diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 6e8d728e9ce..3c8a640ebed 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -250,6 +250,11 @@ typedef struct QueuePosition #define QUEUE_POS_EQUAL(x,y) \ ((x).page == (y).page && (x).offset == (y).offset) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + #define QUEUE_POS_IS_ZERO(x) \ ((x).page == 0 && (x).offset == 0) @@ -287,7 +292,9 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* advisory position for this backend */ bool wakeupPending; /* signal sent but not yet processed */ + bool awakening; /* backend is awakening */ } QueueBackendStatus; /* @@ -348,7 +355,9 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) +#define QUEUE_BACKEND_AWAKENING(i) (asyncQueueControl->backend[i].awakening) /* * The SLRU buffer area through which we access the notification queue @@ -675,7 +684,9 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; + QUEUE_BACKEND_AWAKENING(i) = false; } } @@ -1954,6 +1965,7 @@ SignalBackends(void) ProcNumber *procnos; int count; ListCell *lc; + List *interestedProcs = NIL; INJECTION_POINT("listen-notify-signal-backends", NULL); @@ -2002,6 +2014,12 @@ SignalBackends(void) int32 pid; QueuePosition pos; + // XXX: Use a list to record listeners interested in any of the pending channels. + // List is not the best choice, so it we decide to take this apprach, we + // can optimize it later by using a hash or bitmap. + if (!list_member_int(interestedProcs, i)) + interestedProcs = lappend_int(interestedProcs, i); + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; @@ -2054,19 +2072,41 @@ SignalBackends(void) int64 lag; int32 pid; - if (QUEUE_BACKEND_WAKEUP_PENDING(i)) - continue; + /* XXX we cannot rely on wakeupPending here, because the flag might be set by another notifier. */ + //if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + // continue; pos = QUEUE_BACKEND_POS(i); - /* Direct advancement for idle backends at the old head */ - if (pendingNotifies != NULL && - QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + if (!list_member_int(interestedProcs, i)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - pos = queueHeadAfterWrite; + /* Direct advancement for idle backends at the old head */ + if (pendingNotifies != NULL) + { + if (QUEUE_BACKEND_AWAKENING(i)) + { + // For awakening backend, advice a new position. + if ((QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) || + (QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos) && QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))) && + QUEUE_POS_EQUAL(QUEUE_BACKEND_ADVISORY_POS(i), queueHeadBeforeWrite)) + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; + } + else + { + // For non-awakening backend, directly advance its position. + if (QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + { + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); + } + } + pos = queueHeadAfterWrite; + } } + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + /* Signal backends that have fallen too far behind */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(pos)); @@ -2321,6 +2361,7 @@ asyncQueueReadAllNotifications(void) /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; + QUEUE_BACKEND_AWAKENING(MyProcNumber) = true; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; LWLockRelease(NotifyQueueLock); @@ -2330,6 +2371,9 @@ asyncQueueReadAllNotifications(void) if (QUEUE_POS_EQUAL(pos, head)) { /* Nothing to do, we have read all notifications already. */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_AWAKENING(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); return; } @@ -2441,21 +2485,29 @@ asyncQueueReadAllNotifications(void) page_buffer.buf, snapshot); - /* - * Update our position in shared memory. The 'pos' variable now - * holds our new position (advanced past all messages we just - * processed). This ensures that if we fail while processing - * messages from the next page, we won't reprocess the ones we - * just handled. It also prevents us from overwriting any direct - * advancement that another backend might have done while we were - * processing messages. - */ - LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; - LWLockRelease(NotifyQueueLock); - + // If there is a direct advancement, let's stop reading. + // We don't need to lock here because even if the position + // changes right after we read it, we just do one more loop. + //LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber))) + { + reachedStop = true; + } + //QUEUE_BACKEND_POS(MyProcNumber) = pos; + //LWLockRelease(NotifyQueueLock); } while (!reachedStop); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber))) + { + /* respect direct advancement */ + pos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + } + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(MyProcNumber), 0, 0); + QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_AWAKENING(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); + /* Done with snapshot */ UnregisterSnapshot(snapshot); } diff --git a/src/test/authentication/t/008_listen-pos-race.pl b/src/test/authentication/t/008_listen-pos-race.pl index 060e33ed391..c858e8da524 100644 --- a/src/test/authentication/t/008_listen-pos-race.pl +++ b/src/test/authentication/t/008_listen-pos-race.pl @@ -8,7 +8,7 @@ use PostgreSQL::Test::Utils; use Time::HiRes qw(usleep); use Test::More; -if ($ENV{enable_injection_points} ne 'yes') { +if ($ENV{enable_injection_points} // '' ne 'yes') { plan skip_all => 'Injection points not supported by this build'; } @@ -18,6 +18,7 @@ $node->start; if (!$node->check_extension('injection_points')) { + $node->stop; plan skip_all => 'Extension injection_points not installed'; } --Apple-Mail=_7B646548-F559-49EC-8DB9-78AF6DB09FAE Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset=us-ascii --Apple-Mail=_7B646548-F559-49EC-8DB9-78AF6DB09FAE--