Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1tZX3N-00AZPV-Q7 for pgsql-general@arkaria.postgresql.org; Sun, 19 Jan 2025 15:17:34 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.94.2) (envelope-from ) id 1tZX3M-007uuA-1L for pgsql-general@arkaria.postgresql.org; Sun, 19 Jan 2025 15:17:32 +0000 Received: from magus.postgresql.org ([2a02:c0:301:0:ffff::29]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1tZX3L-007uu2-Jk for pgsql-general@lists.postgresql.org; Sun, 19 Jan 2025 15:17:32 +0000 Received: from mail-wr1-x435.google.com ([2a00:1450:4864:20::435]) by magus.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 (Exim 4.96) (envelope-from ) id 1tZX3H-000Ngq-0m for pgsql-general@lists.postgresql.org; Sun, 19 Jan 2025 15:17:30 +0000 Received: by mail-wr1-x435.google.com with SMTP id ffacd0b85a97d-385e0e224cbso1875645f8f.2 for ; Sun, 19 Jan 2025 07:17:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1737299846; x=1737904646; darn=lists.postgresql.org; h=cc:to:subject:message-id:date:from:in-reply-to:references :mime-version:from:to:cc:subject:date:message-id:reply-to; bh=APIbWeLG48MB9QqUuq/qi5bjxos1emWS4/WYKyL3Pc8=; b=NvgQkiXkeG/ES1x0ZkGxuitHiI/KsFBWqVhyh7I5XjpVVCmtNEUempRsKW2znR8uuS yI1jSOED8BLuSsQ+kVShjK1AQiyIHPwDKbfB8EqJKSth0zs1JCi2m/01fEzgW5XIbZxE +NQF//rfIQ3nUX10LrTzbqNjXGmdV5lgNJxGmufvXvuit8T4Rq36DD0eVHCmNNZhODvL 1QW1GwGdR40A5fJ6Apsw2EbiaWfUKYobJLB20ap40uphZpdpBwlZ/bYe1Cc8yXOBnZES H/kkMY3/m7jCkDVbcf75WqYWtjTDyb1tZ5dbZguDT6e0Q+H1cGizwAjyOOCiWfKS0aug KbBw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1737299846; x=1737904646; h=cc:to:subject:message-id:date:from:in-reply-to:references :mime-version:x-gm-message-state:from:to:cc:subject:date:message-id :reply-to; bh=APIbWeLG48MB9QqUuq/qi5bjxos1emWS4/WYKyL3Pc8=; b=AE97GDBwFJE8WxSz2bE/ISZTqXBIM/Rgm+OU8KVujZWm2UHUVWJVYzfY2Ofp/3mLRQ FRvU1ubMAuQQ3c9PX+3T1bnZBp5+FNk8n3ogFrTe8t/bCi3FIPT7zcfEXExV9FiKsaoN zEbT7NnDmI6VukZ4XqnMVwhigVIkV9roS1kdG8tVOArJZR2YquVhOMujJ44P7acYm351 mn91vQHe03QYFGzFdfJ03y7rChlmsr02+KUFsCgeNmSvIHlP1WIkxE6xE8WWEZbJ3Mvr BU00Gr1/FG0/6fvAL/zy7NMLkd4WXk+MvdEL1jwheWM3JAcRpFdngh51SHkj11OvtKfQ SIjg== X-Gm-Message-State: AOJu0YxswBGgUwFsIf3oUGHa0vkHXdWKua+oN746RlaCvhZWGgKch8aA p0qmYX5asBgNDZREEMLH+Hp2pGQbgnGiYoyoSB3kBuJWFaIZoQAlf7XYgD8KUEwzwRJ98sFUbKv i/2CpOOlwGDdfA9oQyC9JbIJeZoZb/l5K X-Gm-Gg: ASbGncsHqvr0ALbYh+pe4leFRnOjQtf5zvyhLtusIk4u/4b4vGY8m0qeytHkGvisJMO 3tiQxAh18x63EKB8Slbpnza1IrehKJO9VpGZdDA79legr6ylDGWc= X-Google-Smtp-Source: AGHT+IHP2P2xh93CSWx+45lDxX36wuMZP9J7/5dCPlIq8ygRUACy1QGVmM8HVCS19v+Fwdd8M8gTYgtHlzeWIGjQFN4= X-Received: by 2002:a05:6000:2a8:b0:385:fb2c:6021 with SMTP id ffacd0b85a97d-38bf57a9620mr10809446f8f.39.1737299845926; Sun, 19 Jan 2025 07:17:25 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Shaheed Haque Date: Sun, 19 Jan 2025 15:17:14 +0000 X-Gm-Features: AbW1kvZxgBFdxS1Yn2snJ4dbiS78t2X3FiPQ1CLuUq3NnbaAsgf8Giw-0sClxPI Message-ID: Subject: Re: Design of a reliable task processing queue To: Alex Burkhart Cc: pgsql-general list Content-Type: multipart/alternative; boundary="0000000000002f58d7062c10a4e6" List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk --0000000000002f58d7062c10a4e6 Content-Type: text/plain; charset="UTF-8" As with all things distributed, it's useful to start with some notion of what the word "reliable" means to you. For example, when your pseudo code, is replaced by the real code, can that fail? And if it fails, is it known that the work in hand can simply be redone? Those answers will need to be understood alongside the design of the db handling, to ensure the real world behaviour is as required. On Sun, 19 Jan 2025, 08:23 Alex Burkhart, wrote: > Hey team, > > I'm looking for help to organize locks and transaction for a reliable task > queue. > > REQUIREMENTS > > 1. Pending actions are persisted to a database. There's a trace once they > are done. > 2. Application workers pick actions one by one. At any given time, each > action can be assigned to at most one worker (transaction). > 3. If multiple actions have same "lock_id", only one of them is processed > at the time. That has to be action with smallest id. > > MY ATTEMPT > > I got something approximate working with the following setup. > > === BEGIN DATABASE QUERY === > > DROP TABLE IF EXISTS actions; > > CREATE TABLE actions ( > id SERIAL PRIMARY KEY, > lock_id BIGINT, > action VARCHAR(255), > done BOOLEAN DEFAULT false > ); > > -- Sample data for testing. > > INSERT INTO actions (lock_id, action) VALUES > (26, 'Create instance 26'), > (8, 'Update instance 8'), > (26, 'Update instance 26'), > (8, 'Delete instance 8'); > > === END DATABASE QUERY === > > I use Go client to simulate workers utilizing the "actions" table. > > === BEGIN GO CLIENT === > > package main > > import ( > "context" > "fmt" > "os" > "os/signal" > "time" > > "github.com/jackc/pgx/v5/pgxpool" > ) > > func main() { > ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) > defer cancel() > > pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost > /database") > if err != nil { > panic(err) > } > defer pool.Close() > > tx, err := pool.Begin(ctx) > if err != nil { > panic(err) > } > defer tx.Rollback(ctx) > > fmt.Println("BEGIN") > > var ( > id int32 > lockid int64 > action string > ) > if err := tx.QueryRow( > ctx, > `SELECT id, lock_id, action > FROM actions > WHERE done = false > ORDER BY id > LIMIT 1 > FOR NO KEY UPDATE > SKIP LOCKED`, > ).Scan(&id, &lockid, &action); err != nil { > panic(err) > } > > fmt.Println("LOCKING", lockid, "...") > > if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)", > lockid)); err != nil { > panic(err) > } > > fmt.Println("EXECUTING", action) > > select { > case <-ctx.Done(): > // Pretend to do work for 10 seconds. > case <-time.After(10 * time.Second): > if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1", > id); err != nil { > panic(err) > } > if err := tx.Commit(ctx); err != nil { > panic(err) > } > > fmt.Println("DONE") > } > > fmt.Println("UNLOCKED") > } > > === END GO CLIENT === > > This code generates transaction like this. > > === BEGIN TRANSACTION === > > BEGIN; > > -- Lock one row in "actions" table. > > SELECT id, lock_id, action > FROM actions > WHERE done = false > ORDER BY id > LIMIT 1 > FOR NO KEY UPDATE > SKIP LOCKED; > > -- Lock other transactions that process same lock_id. > > SELECT pg_advisory_xact_lock(%lock_id); > > -- Work on the action... mark it done at the end. > > UPDATE actions SET done = true WHERE id = %d; > > COMMIT; > > === END TRANSACTION === > > Which almost does the job. By running the Go client concurrently, it picks > actions one by one and processes only one transaction with same "lock_id" > at a time. > > However I'm worried about the gap between the row lock and "lock_id" lock. > This leaves a room for requirement "That has to be action with smallest id" > to be unsatisfied. > > QUESTION > > Is there a way to improve this attempt and close the gap? Or a completely > different strategy? I was brainstorming how to lock all rows where columns > have the same value or using ARRAY but struggle to put together a reliable > solution. > > Thank you, > Alex > --0000000000002f58d7062c10a4e6 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
As with all things distributed, it's useful to start = with some notion of what the word "reliable" means to you. For ex= ample, when your pseudo code, is replaced by the real code, can that fail? = And if it fails, is it known that the work in hand can simply be redone?
Those answers will need to be un= derstood alongside the design of the db handling, to ensure the real world = behaviour is as required.=C2=A0

On Sun, 19 Jan 2= 025, 08:23 Alex Burkhart, <burk= harta77@gmail.com> wrote:
Hey team,

I'm looking for help to organize locks = and transaction for a reliable task queue.

REQUIREMENTS

1. Pe= nding actions are persisted to a database. There's a trace once they ar= e done.
2. Application workers pick actions one by one. At any given tim= e, each action can be assigned to at most one worker (transaction).
3. I= f multiple actions have same "lock_id", only one of them is proce= ssed at the time. That has to be action with smallest id.

MY ATTEMPT=

I got something approximate working with the following setup.
=3D=3D=3D BEGIN DATABASE QUERY =3D=3D=3D

DROP TABLE IF EXISTS acti= ons;

CREATE TABLE actions (
=C2=A0 id SERIAL PRIMARY KEY,
=C2= =A0 lock_id BIGINT,
=C2=A0 action VARCHAR(255),
=C2=A0 done BOOLEAN D= EFAULT false
);

-- Sample data for testing.

INSERT INTO ac= tions (lock_id, action) VALUES
=C2=A0 (26, 'Create instance 26')= ,
=C2=A0 (8, 'Update instance 8'),
=C2=A0 (26, 'Update in= stance 26'),
=C2=A0 (8, 'Delete instance 8');

=3D=3D= =3D END DATABASE QUERY =3D=3D=3D

I use Go client to simulate workers= utilizing the "actions" table.

=3D=3D=3D BEGIN GO CLIENT = =3D=3D=3D

package main

import (
"context"
&qu= ot;fmt"
"os"
"os/signal"
"time"=

"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx, cancel :=3D signal.NotifyContext(context.Backgr= ound(), os.Interrupt)
defer cancel()

pool, err :=3D pgxpool.New(c= tx, "postgres://postgres:password@localhost/database")
if err = !=3D nil {
panic(err)
}
defer pool.Close()

tx, err :=3D poo= l.Begin(ctx)
if err !=3D nil {
panic(err)
}
defer tx.Rollback(c= tx)

fmt.Println("BEGIN")

var (
id =C2=A0 =C2=A0 = int32
lockid int64
action string
)
if err :=3D tx.QueryRow(
= ctx,
`SELECT id, lock_id, action
=C2=A0 =C2=A0 =C2=A0FROM actions
= =C2=A0 =C2=A0 =C2=A0WHERE done =3D false
=C2=A0 =C2=A0 =C2=A0ORDER BY id=
=C2=A0 =C2=A0 =C2=A0LIMIT 1
=C2=A0 =C2=A0 =C2=A0FOR NO KEY UPDATE=C2=A0 =C2=A0 =C2=A0SKIP LOCKED`,
).Scan(&id, &lockid, &act= ion); err !=3D nil {
panic(err)
}

fmt.Println("LOCKING&qu= ot;, lockid, "...")

if _, err :=3D tx.Exec(ctx, fmt.Sprint= f("SELECT pg_advisory_xact_lock(%d)", lockid)); err !=3D nil {panic(err)
}

fmt.Println("EXECUTING", action)

s= elect {
case <-ctx.Done():
// Pretend to do work for 10 seconds.case <-time.After(10 * time.Second):
if _, err :=3D tx.Exec(ctx, &q= uot;UPDATE actions SET done =3D true WHERE id =3D $1", id); err !=3D n= il {
panic(err)
}
if err :=3D tx.Commit(ctx); err !=3D nil {
pa= nic(err)
}

fmt.Println("DONE")
}

fmt.Println(= "UNLOCKED")
}

=3D=3D=3D END GO CLIENT =3D=3D=3D

= This code generates transaction like this.

=3D=3D=3D BEGIN TRANSACTI= ON =3D=3D=3D

BEGIN;

-- Lock one row in "actions" ta= ble.

SELECT id, lock_id, action
FROM actions
WHERE done =3D fa= lse
ORDER BY id
LIMIT 1
FOR NO KEY UPDATE
SKIP LOCKED;

-= - Lock other transactions that process same lock_id.

SELECT pg_advis= ory_xact_lock(%lock_id);

-- Work on the action... mark it done at th= e end.

UPDATE actions SET done =3D true WHERE id =3D %d;

COMM= IT;

=3D=3D=3D END TRANSACTION =3D=3D=3D

Which almost does the= job. By running the Go client concurrently, it picks actions one by one an= d processes only one transaction with same "lock_id" at a time.
However I'm worried about the gap between the row lock and "= lock_id" lock. This leaves a room for requirement "That has to be= action with smallest id" to be unsatisfied.

QUESTION

Is= there a way to improve this attempt and close the gap? Or a completely dif= ferent strategy? I was brainstorming how to lock all rows where columns hav= e the same value or using ARRAY but struggle to put together a reliable sol= ution.

Thank you,
Alex
--0000000000002f58d7062c10a4e6--