public inbox for [email protected]  
help / color / mirror / Atom feed
From: Shaheed Haque <[email protected]>
To: Alex Burkhart <[email protected]>
Cc: pgsql-general list <[email protected]>
Subject: Re: Design of a reliable task processing queue
Date: Sun, 19 Jan 2025 15:17:14 +0000
Message-ID: <CAHAc2jfAwZ_LuSEQ98m73=xYKexJ6P9KFBKpOkKxmcUA-Pu0Lw@mail.gmail.com> (raw)
In-Reply-To: <CA+vxVFNLn7TOOmE+DRfOJ2z-fpaHYveRg9mq+tYZYG_pckBMfA@mail.gmail.com>
References: <CA+vxVFNLn7TOOmE+DRfOJ2z-fpaHYveRg9mq+tYZYG_pckBMfA@mail.gmail.com>

As with all things distributed, it's useful to start with some notion of
what the word "reliable" means to you. For example, when your pseudo code,
is replaced by the real code, can that fail? And if it fails, is it known
that the work in hand can simply be redone?

Those answers will need to be understood alongside the design of the db
handling, to ensure the real world behaviour is as required.

On Sun, 19 Jan 2025, 08:23 Alex Burkhart, <[email protected]> wrote:

> Hey team,
>
> I'm looking for help to organize locks and transaction for a reliable task
> queue.
>
> REQUIREMENTS
>
> 1. Pending actions are persisted to a database. There's a trace once they
> are done.
> 2. Application workers pick actions one by one. At any given time, each
> action can be assigned to at most one worker (transaction).
> 3. If multiple actions have same "lock_id", only one of them is processed
> at the time. That has to be action with smallest id.
>
> MY ATTEMPT
>
> I got something approximate working with the following setup.
>
> === BEGIN DATABASE QUERY ===
>
> DROP TABLE IF EXISTS actions;
>
> CREATE TABLE actions (
>   id SERIAL PRIMARY KEY,
>   lock_id BIGINT,
>   action VARCHAR(255),
>   done BOOLEAN DEFAULT false
> );
>
> -- Sample data for testing.
>
> INSERT INTO actions (lock_id, action) VALUES
>   (26, 'Create instance 26'),
>   (8, 'Update instance 8'),
>   (26, 'Update instance 26'),
>   (8, 'Delete instance 8');
>
> === END DATABASE QUERY ===
>
> I use Go client to simulate workers utilizing the "actions" table.
>
> === BEGIN GO CLIENT ===
>
> package main
>
> import (
> "context"
> "fmt"
> "os"
> "os/signal"
> "time"
>
> "github.com/jackc/pgx/v5/pgxpool"
> )
>
> func main() {
> ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
> defer cancel()
>
> pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost
> /database")
> if err != nil {
> panic(err)
> }
> defer pool.Close()
>
> tx, err := pool.Begin(ctx)
> if err != nil {
> panic(err)
> }
> defer tx.Rollback(ctx)
>
> fmt.Println("BEGIN")
>
> var (
> id     int32
> lockid int64
> action string
> )
> if err := tx.QueryRow(
> ctx,
> `SELECT id, lock_id, action
>      FROM actions
>      WHERE done = false
>      ORDER BY id
>      LIMIT 1
>      FOR NO KEY UPDATE
>      SKIP LOCKED`,
> ).Scan(&id, &lockid, &action); err != nil {
> panic(err)
> }
>
> fmt.Println("LOCKING", lockid, "...")
>
> if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)",
> lockid)); err != nil {
> panic(err)
> }
>
> fmt.Println("EXECUTING", action)
>
> select {
> case <-ctx.Done():
> // Pretend to do work for 10 seconds.
> case <-time.After(10 * time.Second):
> if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1",
> id); err != nil {
> panic(err)
> }
> if err := tx.Commit(ctx); err != nil {
> panic(err)
> }
>
> fmt.Println("DONE")
> }
>
> fmt.Println("UNLOCKED")
> }
>
> === END GO CLIENT ===
>
> This code generates transaction like this.
>
> === BEGIN TRANSACTION ===
>
> BEGIN;
>
> -- Lock one row in "actions" table.
>
> SELECT id, lock_id, action
> FROM actions
> WHERE done = false
> ORDER BY id
> LIMIT 1
> FOR NO KEY UPDATE
> SKIP LOCKED;
>
> -- Lock other transactions that process same lock_id.
>
> SELECT pg_advisory_xact_lock(%lock_id);
>
> -- Work on the action... mark it done at the end.
>
> UPDATE actions SET done = true WHERE id = %d;
>
> COMMIT;
>
> === END TRANSACTION ===
>
> Which almost does the job. By running the Go client concurrently, it picks
> actions one by one and processes only one transaction with same "lock_id"
> at a time.
>
> However I'm worried about the gap between the row lock and "lock_id" lock.
> This leaves a room for requirement "That has to be action with smallest id"
> to be unsatisfied.
>
> QUESTION
>
> Is there a way to improve this attempt and close the gap? Or a completely
> different strategy? I was brainstorming how to lock all rows where columns
> have the same value or using ARRAY but struggle to put together a reliable
> solution.
>
> Thank you,
> Alex
>


view thread (5+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Design of a reliable task processing queue
  In-Reply-To: <CAHAc2jfAwZ_LuSEQ98m73=xYKexJ6P9KFBKpOkKxmcUA-Pu0Lw@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox