public inbox for [email protected]
help / color / mirror / Atom feedDesign of a reliable task processing queue
5+ messages / 5 participants
[nested] [flat]
* Design of a reliable task processing queue
@ 2025-01-18 10:44 Alex Burkhart <[email protected]>
2025-01-19 11:37 ` Re: Design of a reliable task processing queue Peter J. Holzer <[email protected]>
2025-01-19 15:17 ` Re: Design of a reliable task processing queue Shaheed Haque <[email protected]>
2025-01-19 16:06 ` Re: Design of a reliable task processing queue [email protected]
2025-01-21 12:21 ` Re: Design of a reliable task processing queue Dominique Devienne <[email protected]>
0 siblings, 4 replies; 5+ messages in thread
From: Alex Burkhart @ 2025-01-18 10:44 UTC (permalink / raw)
To: [email protected]
Hey team,
I'm looking for help to organize locks and transaction for a reliable task
queue.
REQUIREMENTS
1. Pending actions are persisted to a database. There's a trace once they
are done.
2. Application workers pick actions one by one. At any given time, each
action can be assigned to at most one worker (transaction).
3. If multiple actions have same "lock_id", only one of them is processed
at the time. That has to be action with smallest id.
MY ATTEMPT
I got something approximate working with the following setup.
=== BEGIN DATABASE QUERY ===
DROP TABLE IF EXISTS actions;
CREATE TABLE actions (
id SERIAL PRIMARY KEY,
lock_id BIGINT,
action VARCHAR(255),
done BOOLEAN DEFAULT false
);
-- Sample data for testing.
INSERT INTO actions (lock_id, action) VALUES
(26, 'Create instance 26'),
(8, 'Update instance 8'),
(26, 'Update instance 26'),
(8, 'Delete instance 8');
=== END DATABASE QUERY ===
I use Go client to simulate workers utilizing the "actions" table.
=== BEGIN GO CLIENT ===
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost
/database")
if err != nil {
panic(err)
}
defer pool.Close()
tx, err := pool.Begin(ctx)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)
fmt.Println("BEGIN")
var (
id int32
lockid int64
action string
)
if err := tx.QueryRow(
ctx,
`SELECT id, lock_id, action
FROM actions
WHERE done = false
ORDER BY id
LIMIT 1
FOR NO KEY UPDATE
SKIP LOCKED`,
).Scan(&id, &lockid, &action); err != nil {
panic(err)
}
fmt.Println("LOCKING", lockid, "...")
if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)",
lockid)); err != nil {
panic(err)
}
fmt.Println("EXECUTING", action)
select {
case <-ctx.Done():
// Pretend to do work for 10 seconds.
case <-time.After(10 * time.Second):
if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1",
id); err != nil {
panic(err)
}
if err := tx.Commit(ctx); err != nil {
panic(err)
}
fmt.Println("DONE")
}
fmt.Println("UNLOCKED")
}
=== END GO CLIENT ===
This code generates transaction like this.
=== BEGIN TRANSACTION ===
BEGIN;
-- Lock one row in "actions" table.
SELECT id, lock_id, action
FROM actions
WHERE done = false
ORDER BY id
LIMIT 1
FOR NO KEY UPDATE
SKIP LOCKED;
-- Lock other transactions that process same lock_id.
SELECT pg_advisory_xact_lock(%lock_id);
-- Work on the action... mark it done at the end.
UPDATE actions SET done = true WHERE id = %d;
COMMIT;
=== END TRANSACTION ===
Which almost does the job. By running the Go client concurrently, it picks
actions one by one and processes only one transaction with same "lock_id"
at a time.
However I'm worried about the gap between the row lock and "lock_id" lock.
This leaves a room for requirement "That has to be action with smallest id"
to be unsatisfied.
QUESTION
Is there a way to improve this attempt and close the gap? Or a completely
different strategy? I was brainstorming how to lock all rows where columns
have the same value or using ARRAY but struggle to put together a reliable
solution.
Thank you,
Alex
^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: Design of a reliable task processing queue
2025-01-18 10:44 Design of a reliable task processing queue Alex Burkhart <[email protected]>
@ 2025-01-19 11:37 ` Peter J. Holzer <[email protected]>
3 siblings, 0 replies; 5+ messages in thread
From: Peter J. Holzer @ 2025-01-19 11:37 UTC (permalink / raw)
To: [email protected]
On 2025-01-18 19:44:07 +0900, Alex Burkhart wrote:
> REQUIREMENTS
>
> 1. Pending actions are persisted to a database. There's a trace once they are
> done.
> 2. Application workers pick actions one by one. At any given time, each action
> can be assigned to at most one worker (transaction).
> 3. If multiple actions have same "lock_id", only one of them is processed at
> the time. That has to be action with smallest id.
>
> MY ATTEMPT
>
> I got something approximate working with the following setup.
>
> === BEGIN DATABASE QUERY ===
>
> DROP TABLE IF EXISTS actions;
>
> CREATE TABLE actions (
> id SERIAL PRIMARY KEY,
> lock_id BIGINT,
> action VARCHAR(255),
> done BOOLEAN DEFAULT false
> );
>
[... removed Go code ...]
> This code generates transaction like this.
>
> === BEGIN TRANSACTION ===
>
> BEGIN;
>
> -- Lock one row in "actions" table.
>
> SELECT id, lock_id, action
> FROM actions
> WHERE done = false
> ORDER BY id
> LIMIT 1
> FOR NO KEY UPDATE
> SKIP LOCKED;
>
> -- Lock other transactions that process same lock_id.
>
> SELECT pg_advisory_xact_lock(%lock_id);
>
> -- Work on the action... mark it done at the end.
>
> UPDATE actions SET done = true WHERE id = %d;
>
> COMMIT;
>
> === END TRANSACTION ===
>
> Which almost does the job. By running the Go client concurrently, it picks
> actions one by one and processes only one transaction with same "lock_id" at a
> time.
>
> However I'm worried about the gap between the row lock and "lock_id" lock. This
> leaves a room for requirement "That has to be action with smallest id" to be
> unsatisfied.
I'd be worried about that, too.
Let's assume you have two entries in your queue:
123, 26, 'Create instance 26', false
124, 26, 'Update instance 26', false
And also two worker threads. Then this scenario is possible:
Worker A executes the SELECT ... SKIP LOCKED query and gets the
record with ID 123
Worker B executes the SELECT ... SKIP LOCKED query. Since record 123
is already locked it gets the record with ID 124
Worker B executes the SELECT pg_advisory_xact_lock(26) query
successfully and can continue
Worker A executes the SELECT pg_advisory_xact_lock(26) query, but
blocks and has to wait for worker B to commit (or rollback).
So in this case the update would happen before the create, which isn't
good.
> QUESTION
>
> Is there a way to improve this attempt and close the gap?
I'm not sure if I like this solution (if feels a bit hacky), but the
following should work, IMHO:
1) After the first select, do another on all records with the same
lock_id:
SELECT id, lock_id, action
FROM actions
WHERE lock_id = %d AND done = false
ORDER BY id
FOR NO KEY UPDATE
2) Check whether the id you got first is the smallest of all.
3) If it isn't, rollback and start over.
4) If it is, you have now locked all the rows with the same lock_id and
can continue.
The advisory lock isn't needed then.
hp
--
_ | Peter J. Holzer | Story must make more sense than reality.
|_|_) | |
| | | [email protected] | -- Charles Stross, "Creative writing
__/ | http://www.hjp.at/ | challenge!"
Attachments:
[application/pgp-signature] signature.asc (833B, 2-signature.asc)
download
^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: Design of a reliable task processing queue
2025-01-18 10:44 Design of a reliable task processing queue Alex Burkhart <[email protected]>
@ 2025-01-19 15:17 ` Shaheed Haque <[email protected]>
3 siblings, 0 replies; 5+ messages in thread
From: Shaheed Haque @ 2025-01-19 15:17 UTC (permalink / raw)
To: Alex Burkhart <[email protected]>; +Cc: pgsql-general list <[email protected]>
As with all things distributed, it's useful to start with some notion of
what the word "reliable" means to you. For example, when your pseudo code,
is replaced by the real code, can that fail? And if it fails, is it known
that the work in hand can simply be redone?
Those answers will need to be understood alongside the design of the db
handling, to ensure the real world behaviour is as required.
On Sun, 19 Jan 2025, 08:23 Alex Burkhart, <[email protected]> wrote:
> Hey team,
>
> I'm looking for help to organize locks and transaction for a reliable task
> queue.
>
> REQUIREMENTS
>
> 1. Pending actions are persisted to a database. There's a trace once they
> are done.
> 2. Application workers pick actions one by one. At any given time, each
> action can be assigned to at most one worker (transaction).
> 3. If multiple actions have same "lock_id", only one of them is processed
> at the time. That has to be action with smallest id.
>
> MY ATTEMPT
>
> I got something approximate working with the following setup.
>
> === BEGIN DATABASE QUERY ===
>
> DROP TABLE IF EXISTS actions;
>
> CREATE TABLE actions (
> id SERIAL PRIMARY KEY,
> lock_id BIGINT,
> action VARCHAR(255),
> done BOOLEAN DEFAULT false
> );
>
> -- Sample data for testing.
>
> INSERT INTO actions (lock_id, action) VALUES
> (26, 'Create instance 26'),
> (8, 'Update instance 8'),
> (26, 'Update instance 26'),
> (8, 'Delete instance 8');
>
> === END DATABASE QUERY ===
>
> I use Go client to simulate workers utilizing the "actions" table.
>
> === BEGIN GO CLIENT ===
>
> package main
>
> import (
> "context"
> "fmt"
> "os"
> "os/signal"
> "time"
>
> "github.com/jackc/pgx/v5/pgxpool"
> )
>
> func main() {
> ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
> defer cancel()
>
> pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost
> /database")
> if err != nil {
> panic(err)
> }
> defer pool.Close()
>
> tx, err := pool.Begin(ctx)
> if err != nil {
> panic(err)
> }
> defer tx.Rollback(ctx)
>
> fmt.Println("BEGIN")
>
> var (
> id int32
> lockid int64
> action string
> )
> if err := tx.QueryRow(
> ctx,
> `SELECT id, lock_id, action
> FROM actions
> WHERE done = false
> ORDER BY id
> LIMIT 1
> FOR NO KEY UPDATE
> SKIP LOCKED`,
> ).Scan(&id, &lockid, &action); err != nil {
> panic(err)
> }
>
> fmt.Println("LOCKING", lockid, "...")
>
> if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)",
> lockid)); err != nil {
> panic(err)
> }
>
> fmt.Println("EXECUTING", action)
>
> select {
> case <-ctx.Done():
> // Pretend to do work for 10 seconds.
> case <-time.After(10 * time.Second):
> if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1",
> id); err != nil {
> panic(err)
> }
> if err := tx.Commit(ctx); err != nil {
> panic(err)
> }
>
> fmt.Println("DONE")
> }
>
> fmt.Println("UNLOCKED")
> }
>
> === END GO CLIENT ===
>
> This code generates transaction like this.
>
> === BEGIN TRANSACTION ===
>
> BEGIN;
>
> -- Lock one row in "actions" table.
>
> SELECT id, lock_id, action
> FROM actions
> WHERE done = false
> ORDER BY id
> LIMIT 1
> FOR NO KEY UPDATE
> SKIP LOCKED;
>
> -- Lock other transactions that process same lock_id.
>
> SELECT pg_advisory_xact_lock(%lock_id);
>
> -- Work on the action... mark it done at the end.
>
> UPDATE actions SET done = true WHERE id = %d;
>
> COMMIT;
>
> === END TRANSACTION ===
>
> Which almost does the job. By running the Go client concurrently, it picks
> actions one by one and processes only one transaction with same "lock_id"
> at a time.
>
> However I'm worried about the gap between the row lock and "lock_id" lock.
> This leaves a room for requirement "That has to be action with smallest id"
> to be unsatisfied.
>
> QUESTION
>
> Is there a way to improve this attempt and close the gap? Or a completely
> different strategy? I was brainstorming how to lock all rows where columns
> have the same value or using ARRAY but struggle to put together a reliable
> solution.
>
> Thank you,
> Alex
>
^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: Design of a reliable task processing queue
2025-01-18 10:44 Design of a reliable task processing queue Alex Burkhart <[email protected]>
@ 2025-01-19 16:06 ` [email protected]
3 siblings, 0 replies; 5+ messages in thread
From: [email protected] @ 2025-01-19 16:06 UTC (permalink / raw)
To: [email protected]
On Saturday, 18 January 2025 12:44:07 EET Alex Burkhart wrote:
> Hey team,
>
> I'm looking for help to organize locks and transaction for a reliable task
> queue.
>
> REQUIREMENTS
>
> 1. Pending actions are persisted to a database. There's a trace once they
> are done.
> 2. Application workers pick actions one by one. At any given time, each
> action can be assigned to at most one worker (transaction).
> 3. If multiple actions have same "lock_id", only one of them is processed
> at the time. That has to be action with smallest id.
Why reinvent the wheel and not use production-ready code from projects such as que (Ruby), pgqueuer (Python)?
--
Regards,
Peter
^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: Design of a reliable task processing queue
2025-01-18 10:44 Design of a reliable task processing queue Alex Burkhart <[email protected]>
@ 2025-01-21 12:21 ` Dominique Devienne <[email protected]>
3 siblings, 0 replies; 5+ messages in thread
From: Dominique Devienne @ 2025-01-21 12:21 UTC (permalink / raw)
To: Alex Burkhart <[email protected]>; +Cc: [email protected]
On Sun, Jan 19, 2025 at 9:23 AM Alex Burkhart <[email protected]> wrote:
> I'm looking for help to organize locks and transaction for a reliable task queue.
Have a look at https://github.com/tembo-io/pgmq for inspiration maybe. --DD
^ permalink raw reply [nested|flat] 5+ messages in thread
end of thread, other threads:[~2025-01-21 12:21 UTC | newest]
Thread overview: 5+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2025-01-18 10:44 Design of a reliable task processing queue Alex Burkhart <[email protected]>
2025-01-19 11:37 ` Peter J. Holzer <[email protected]>
2025-01-19 15:17 ` Shaheed Haque <[email protected]>
2025-01-19 16:06 ` [email protected]
2025-01-21 12:21 ` Dominique Devienne <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox