Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1tZQeH-0099qw-Nm for pgsql-general@arkaria.postgresql.org; Sun, 19 Jan 2025 08:27:14 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.94.2) (envelope-from ) id 1tZQeG-005odw-ER for pgsql-general@arkaria.postgresql.org; Sun, 19 Jan 2025 08:27:12 +0000 Received: from makus.postgresql.org ([2001:4800:3e1:1::229]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1tZ6JU-00H4NN-Jn for pgsql-general@lists.postgresql.org; Sat, 18 Jan 2025 10:44:25 +0000 Received: from mail-ej1-x62c.google.com ([2a00:1450:4864:20::62c]) by makus.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 (Exim 4.96) (envelope-from ) id 1tZ6JS-000BVJ-1T for pgsql-general@lists.postgresql.org; Sat, 18 Jan 2025 10:44:23 +0000 Received: by mail-ej1-x62c.google.com with SMTP id a640c23a62f3a-aa68b513abcso544456266b.0 for ; Sat, 18 Jan 2025 02:44:22 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1737197061; x=1737801861; darn=lists.postgresql.org; h=to:subject:message-id:date:from:mime-version:from:to:cc:subject :date:message-id:reply-to; bh=KBzgt8lABgYBmMu7SUV4EEhmCHx3Lo1ZAYIQsysehsg=; b=WnXXSSNMm8OSWCVnbQce9k7Ip/QfxtzYKwWED4yRBSEkPsNhExcXqNTojH78XzX40a 4N5L8CZRn0LcZM464CI3cM2vWnnrQ+cx+S5cRyjuBpt1AJBqXiyRPUqSC7+3vd+wjBf+ SUdftDIgxZfxMdfAY2hhs8gUfNyG8cndNu0Avqk9Grv9UyFPkHXKDybJ1neWTxQjkYn9 KTOsv0qWHL8Gv0m2el19hiGZ0ZXnakQbrNobZ6ioadCHTtynEZgl8+vNovwwcCxXRf8B wTDgsdHNcQOciykDrOOMGQWtH7OweEStwXOy812V3JPZZd6cNDo4Q0GcAQ3//qYtomwp h8wA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1737197061; x=1737801861; h=to:subject:message-id:date:from:mime-version:x-gm-message-state :from:to:cc:subject:date:message-id:reply-to; bh=KBzgt8lABgYBmMu7SUV4EEhmCHx3Lo1ZAYIQsysehsg=; b=EnzheK7BsIpM1jn4w/pJqeKAK9QCdHH+4i3X+n8ssGITniIaCbJ2kNEEk0GB0QlJPO AA+7E1F73nmv0a9StKlB7rsEMviezDGWuKYHAP0+tQ0ynK1O0KjHHTFGSjoktVwlETkM zpb5MZagpPleeJ3huMrfSm348KomxqaAithW9bU2aO2ZbtNpVGmBE0RO6liTK8ApYvB9 P3agK5QlWYzq5Vdc4uKxExII611m4bqvh+hunOWoGDyh3akxMGC+VPmp7fOqWk01P3fh Yb5QzqjKZY89a7ZGvs/WuZw3BoC37UV4HiBSbbsx6CUf3E6wVovPbm0nRU98fQ/7ma/r HKEA== X-Gm-Message-State: AOJu0Yz3uqW9KKUkFfm1tPH6mdiKURNRvuQ5a4pMMe6SWbe8ie1UdfMN NpI5qkV8aZqehB/YrYZ3hdy0gB2GZc5wMkKwKLvEivLcmGOlpgYROc6nuOyUTsGlQafsxJnUzou 5+okNmYFkHVjiE77APJo9JWULbU5iWmmAiLou0vJX X-Gm-Gg: ASbGncsbF5ELy0ohIh/6y20pST6KgylEZJ52B896+XEKxoSKvNY3CTGK6/t5FwgQTC5 MbxN92N8Lx9UBlIxEMxPj7pbYy+aHKMt+c6iHdtfhsG6b6TR/VgyBtXMTTlKKzGrU5ejAfETf6r mHMjhXOQ== X-Google-Smtp-Source: AGHT+IFRUWSdwWgKD0W5nWNj5YOmdu6rDfbdzaTu6Vfz7HTnaUKKtwCkCH3eGnqFyl64znxeFYUkuFSqO3C6/t54vgE= X-Received: by 2002:a17:906:7314:b0:aa6:6a52:970 with SMTP id a640c23a62f3a-ab38b0ba253mr559069266b.1.1737197060208; Sat, 18 Jan 2025 02:44:20 -0800 (PST) MIME-Version: 1.0 From: Alex Burkhart Date: Sat, 18 Jan 2025 19:44:07 +0900 X-Gm-Features: AbW1kvaTh8JQTZSAFvpdtx7MoKnwpTWQx4hUAshH-Vy4PZZtQg5LkNvyUDY3gNU Message-ID: Subject: Design of a reliable task processing queue To: pgsql-general@lists.postgresql.org Content-Type: multipart/alternative; boundary="000000000000adbffc062bf8b560" List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk --000000000000adbffc062bf8b560 Content-Type: text/plain; charset="UTF-8" Hey team, I'm looking for help to organize locks and transaction for a reliable task queue. REQUIREMENTS 1. Pending actions are persisted to a database. There's a trace once they are done. 2. Application workers pick actions one by one. At any given time, each action can be assigned to at most one worker (transaction). 3. If multiple actions have same "lock_id", only one of them is processed at the time. That has to be action with smallest id. MY ATTEMPT I got something approximate working with the following setup. === BEGIN DATABASE QUERY === DROP TABLE IF EXISTS actions; CREATE TABLE actions ( id SERIAL PRIMARY KEY, lock_id BIGINT, action VARCHAR(255), done BOOLEAN DEFAULT false ); -- Sample data for testing. INSERT INTO actions (lock_id, action) VALUES (26, 'Create instance 26'), (8, 'Update instance 8'), (26, 'Update instance 26'), (8, 'Delete instance 8'); === END DATABASE QUERY === I use Go client to simulate workers utilizing the "actions" table. === BEGIN GO CLIENT === package main import ( "context" "fmt" "os" "os/signal" "time" "github.com/jackc/pgx/v5/pgxpool" ) func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost /database") if err != nil { panic(err) } defer pool.Close() tx, err := pool.Begin(ctx) if err != nil { panic(err) } defer tx.Rollback(ctx) fmt.Println("BEGIN") var ( id int32 lockid int64 action string ) if err := tx.QueryRow( ctx, `SELECT id, lock_id, action FROM actions WHERE done = false ORDER BY id LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED`, ).Scan(&id, &lockid, &action); err != nil { panic(err) } fmt.Println("LOCKING", lockid, "...") if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)", lockid)); err != nil { panic(err) } fmt.Println("EXECUTING", action) select { case <-ctx.Done(): // Pretend to do work for 10 seconds. case <-time.After(10 * time.Second): if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1", id); err != nil { panic(err) } if err := tx.Commit(ctx); err != nil { panic(err) } fmt.Println("DONE") } fmt.Println("UNLOCKED") } === END GO CLIENT === This code generates transaction like this. === BEGIN TRANSACTION === BEGIN; -- Lock one row in "actions" table. SELECT id, lock_id, action FROM actions WHERE done = false ORDER BY id LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED; -- Lock other transactions that process same lock_id. SELECT pg_advisory_xact_lock(%lock_id); -- Work on the action... mark it done at the end. UPDATE actions SET done = true WHERE id = %d; COMMIT; === END TRANSACTION === Which almost does the job. By running the Go client concurrently, it picks actions one by one and processes only one transaction with same "lock_id" at a time. However I'm worried about the gap between the row lock and "lock_id" lock. This leaves a room for requirement "That has to be action with smallest id" to be unsatisfied. QUESTION Is there a way to improve this attempt and close the gap? Or a completely different strategy? I was brainstorming how to lock all rows where columns have the same value or using ARRAY but struggle to put together a reliable solution. Thank you, Alex --000000000000adbffc062bf8b560 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hey team,

I'm looking for help to organize lock= s and transaction for a reliable task queue.

REQUIREMENTS

1. = Pending actions are persisted to a database. There's a trace once they = are done.
2. Application workers pick actions one by one. At any given t= ime, each action can be assigned to at most one worker (transaction).
3.= If multiple actions have same "lock_id", only one of them is pro= cessed at the time. That has to be action with smallest id.

MY ATTEM= PT

I got something approximate working with the following setup.
=
=3D=3D=3D BEGIN DATABASE QUERY =3D=3D=3D

DROP TABLE IF EXISTS ac= tions;

CREATE TABLE actions (
=C2=A0 id SERIAL PRIMARY KEY,
= =C2=A0 lock_id BIGINT,
=C2=A0 action VARCHAR(255),
=C2=A0 done BOOLEA= N DEFAULT false
);

-- Sample data for testing.

INSERT INTO= actions (lock_id, action) VALUES
=C2=A0 (26, 'Create instance 26= 9;),
=C2=A0 (8, 'Update instance 8'),
=C2=A0 (26, 'Update= instance 26'),
=C2=A0 (8, 'Delete instance 8');

=3D= =3D=3D END DATABASE QUERY =3D=3D=3D

I use Go client to simulate work= ers utilizing the "actions" table.

=3D=3D=3D BEGIN GO CLIE= NT =3D=3D=3D

package main

import (
"context"
= "fmt"
"os"
"os/signal"
"time&qu= ot;

"github.= com/jackc/pgx/v5/pgxpool"
)

func main() {
ctx, cancel= :=3D signal.NotifyContext(context.Background(), os.Interrupt)
defer can= cel()

pool, err :=3D pgxpool.New(ctx, "postgres://postgres:pass= word@localhost/database")
if err !=3D nil {
panic(err)
}
d= efer pool.Close()

tx, err :=3D pool.Begin(ctx)
if err !=3D nil {<= br>panic(err)
}
defer tx.Rollback(ctx)

fmt.Println("BEGIN= ")

var (
id =C2=A0 =C2=A0 int32
lockid int64
action st= ring
)
if err :=3D tx.QueryRow(
ctx,
`SELECT id, lock_id, actio= n
=C2=A0 =C2=A0 =C2=A0FROM actions
=C2=A0 =C2=A0 =C2=A0WHERE done =3D= false
=C2=A0 =C2=A0 =C2=A0ORDER BY id
=C2=A0 =C2=A0 =C2=A0LIMIT 1=C2=A0 =C2=A0 =C2=A0FOR NO KEY UPDATE
=C2=A0 =C2=A0 =C2=A0SKIP LOCKED`,=
).Scan(&id, &lockid, &action); err !=3D nil {
panic(err)=
}

fmt.Println("LOCKING", lockid, "...")
<= br>if _, err :=3D tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lo= ck(%d)", lockid)); err !=3D nil {
panic(err)
}

fmt.Printl= n("EXECUTING", action)

select {
case <-ctx.Done():// Pretend to do work for 10 seconds.
case <-time.After(10 * time.S= econd):
if _, err :=3D tx.Exec(ctx, "UPDATE actions SET done =3D tr= ue WHERE id =3D $1", id); err !=3D nil {
panic(err)
}
if err = :=3D tx.Commit(ctx); err !=3D nil {
panic(err)
}

fmt.Println(&= quot;DONE")
}

fmt.Println("UNLOCKED")
}

= =3D=3D=3D END GO CLIENT =3D=3D=3D

This code generates transaction li= ke this.

=3D=3D=3D BEGIN TRANSACTION =3D=3D=3D

BEGIN;

= -- Lock one row in "actions" table.

SELECT id, lock_id, ac= tion
FROM actions
WHERE done =3D false
ORDER BY id
LIMIT 1
F= OR NO KEY UPDATE
SKIP LOCKED;

-- Lock other transactions that pro= cess same lock_id.

SELECT pg_advisory_xact_lock(%lock_id);

--= Work on the action... mark it done at the end.

UPDATE actions SET d= one =3D true WHERE id =3D %d;

COMMIT;

=3D=3D=3D END TRANSACTI= ON =3D=3D=3D

Which almost does the job. By running the Go client con= currently, it picks actions one by one and processes only one transaction w= ith same "lock_id" at a time.

However I'm worried abou= t the gap between the row lock and "lock_id" lock. This leaves a = room for requirement "That has to be action with smallest id" to = be unsatisfied.

QUESTION

Is there a way to improve this attem= pt and close the gap? Or a completely different strategy? I was brainstormi= ng how to lock all rows where columns have the same value or using ARRAY bu= t struggle to put together a reliable solution.

Thank you,
Alex --000000000000adbffc062bf8b560--