public inbox for [email protected]  
help / color / mirror / Atom feed
Re: How batch processing works
7+ messages / 4 participants
[nested] [flat]

* Re: How batch processing works
@ 2024-09-21 04:21  Michał Kłeczek <[email protected]>
  0 siblings, 1 reply; 7+ messages in thread

From: Michał Kłeczek @ 2024-09-21 04:21 UTC (permalink / raw)
  To: Lok P <[email protected]>; +Cc: pgsql-general <[email protected]>

Hi,

> On 19 Sep 2024, at 07:30, Lok P <[email protected]> wrote:
> 
[snip]
> 
> Method-4
> 
> INSERT INTO parent_table VALUES  (1, 'a'), (2, 'a');
> INSERT INTO child_table VALUES   (1,1, 'a'), (1,2, 'a');
> commit;

I’ve done some batch processing of JSON messages from Kafka in Java.
By far the most performant way was to:

1. Use prepared statements
2. Parse JSON messages in Postgres
3. Process messages in batches

All three can be achieved by using arrays to pass batches:

WITH parsed AS (
  SELECT msg::json FROM unnest(?)
),
parents AS (
  INSERT INTO parent SELECT … FROM parsed RETURNING ...
)
INSERT INTO child SELECT … FROM parsed…

Not the single parameter that you can bind to String[]

Hope that helps.

--
Michal





^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-21 11:14  Lok P <[email protected]>
  parent: Michał Kłeczek <[email protected]>
  0 siblings, 1 reply; 7+ messages in thread

From: Lok P @ 2024-09-21 11:14 UTC (permalink / raw)
  To: Michał Kłeczek <[email protected]>; +Cc: pgsql-general <[email protected]>

On Sat, Sep 21, 2024 at 9:51 AM Michał Kłeczek <[email protected]> wrote:

> Hi,
>
> > On 19 Sep 2024, at 07:30, Lok P <[email protected]> wrote:
> >
> [snip]
> >
> > Method-4
> >
> > INSERT INTO parent_table VALUES  (1, 'a'), (2, 'a');
> > INSERT INTO child_table VALUES   (1,1, 'a'), (1,2, 'a');
> > commit;
>
> I’ve done some batch processing of JSON messages from Kafka in Java.
> By far the most performant way was to:
>
> 1. Use prepared statements
> 2. Parse JSON messages in Postgres
> 3. Process messages in batches
>
> All three can be achieved by using arrays to pass batches:
>
> WITH parsed AS (
>   SELECT msg::json FROM unnest(?)
> ),
> parents AS (
>   INSERT INTO parent SELECT … FROM parsed RETURNING ...
> )
> INSERT INTO child SELECT … FROM parsed…
>
> Not the single parameter that you can bind to String[]
>
> Hope that helps.
>
>
Got your point.
But wondering why we don't see any difference in performance between
method-2 and method-3 above. So does it mean that,I am testing this in a
wrong way or it's the expected behaviour and thus there is no meaning in
converting the row by row inserts into a bulk insert, but just changing the
commit frequency will do the same job in a row by row insert approach?


^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-21 14:36  Peter J. Holzer <[email protected]>
  parent: Lok P <[email protected]>
  0 siblings, 1 reply; 7+ messages in thread

From: Peter J. Holzer @ 2024-09-21 14:36 UTC (permalink / raw)
  To: [email protected]

On 2024-09-21 16:44:08 +0530, Lok P wrote:
> But wondering why we don't see any difference in performance between method-2
> and method-3 above.

The code runs completely inside the database. So there isn't much
difference between a single statement which inserts 50 rows and 50
statements which insert 1 row each. The work to be done is (almost) the
same.

This changes once you consider an application which runs outside of the
database (maybe even on a different host). Such an application has to
wait for the result of each statement before it can send the next one.
Now it makes a difference whether you are waiting 50 times for a
statement which does very little or just once for a statement which does
more work.

> So does it mean that,I am testing this in a wrong way or

That depends on what you want to test. If you are interested in the
behaviour of stored procedures, the test is correct. If you want to know
about the performance of a database client (whether its written in Java,
Python, Go or whatever), this is the wrong test. You have to write the
test in your target language and run it on the client system to get
realistic results (for example, the round-trip times will be a lot
shorter if the client and database are on the same computer than when
one is in Europe and the other in America).

For example, here are the three methods as Python scripts:

---------------------------------------------------------------------------------------------------
#!/usr/bin/python3

import time
import psycopg2

num_inserts = 10_000

db = psycopg2.connect()
csr = db.cursor()

csr.execute("drop table if exists parent_table")
csr.execute("create table parent_table (id int primary key, t text)")

start_time = time.monotonic()
for i in range(1, num_inserts+1):
    csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
    db.commit()
end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"Method 1: Individual Inserts with Commit after every Row: {elapsed_time:.3} seconds")

# vim: tw=99
---------------------------------------------------------------------------------------------------
#!/usr/bin/python3

import time
import psycopg2

num_inserts = 10_000
batch_size = 50

db = psycopg2.connect()
csr = db.cursor()

csr.execute("drop table if exists parent_table")
csr.execute("create table parent_table (id int primary key, t text)")
db.commit()

start_time = time.monotonic()
for i in range(1, num_inserts+1):
    csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
    if i % batch_size == 0:
        db.commit()
db.commit()
end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"Method 2: Individual Inserts with Commit after {batch_size}  Rows: {elapsed_time:.3} seconds")

# vim: tw=99
---------------------------------------------------------------------------------------------------
#!/usr/bin/python3

import itertools
import time
import psycopg2

num_inserts = 10_000
batch_size = 50

db = psycopg2.connect()
csr = db.cursor()

csr.execute("drop table if exists parent_table")
csr.execute("create table parent_table (id int primary key, t text)")
db.commit()

start_time = time.monotonic()
batch = []
for i in range(1, num_inserts+1):
    batch.append((i, 'a'))
    if i % batch_size == 0:
        q = "insert into parent_table values" + ",".join(["(%s, %s)"] * len(batch))
        params = list(itertools.chain.from_iterable(batch))
        csr.execute(q, params)
        db.commit()
        batch = []
if batch:
    q = "insert into parent_table values" + ",".join(["(%s, %s)"] * len(batch))
    csr.execute(q, list(itertools.chain(batch)))
    db.commit()
    batch = []

end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"Method 3: Batch Inserts ({batch_size})  with Commit after each batch: {elapsed_time:.3} seconds")

# vim: tw=99
---------------------------------------------------------------------------------------------------

On my laptop, method2 is about twice as fast as method3. But if I
connect to a database on the other side of the city, method2 is now more
than 16 times faster than method3 . Simply because the delay in
communication is now large compared to the time it takes to insert those
rows.

        hp

-- 
   _  | Peter J. Holzer    | Story must make more sense than reality.
|_|_) |                    |
| |   | [email protected]         |    -- Charles Stross, "Creative writing
__/   | http://www.hjp.at/ |       challenge!"


Attachments:

  [application/pgp-signature] signature.asc (833B, 2-signature.asc)
  download

^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-21 19:15  Adrian Klaver <[email protected]>
  parent: Peter J. Holzer <[email protected]>
  0 siblings, 2 replies; 7+ messages in thread

From: Adrian Klaver @ 2024-09-21 19:15 UTC (permalink / raw)
  To: [email protected]

On 9/21/24 07:36, Peter J. Holzer wrote:
> On 2024-09-21 16:44:08 +0530, Lok P wrote:

> ---------------------------------------------------------------------------------------------------
> #!/usr/bin/python3
> 
> import time
> import psycopg2
> 
> num_inserts = 10_000
> batch_size = 50
> 
> db = psycopg2.connect()
> csr = db.cursor()
> 
> csr.execute("drop table if exists parent_table")
> csr.execute("create table parent_table (id int primary key, t text)")
> db.commit()
> 
> start_time = time.monotonic()
> for i in range(1, num_inserts+1):
>      csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
>      if i % batch_size == 0:
>          db.commit()
> db.commit()
> end_time = time.monotonic()
> elapsed_time = end_time - start_time
> print(f"Method 2: Individual Inserts with Commit after {batch_size}  Rows: {elapsed_time:.3} seconds")
> 
> # vim: tw=99
> ---------------------------------------------------------------------------------------------------

FYI, this is less of problem with psycopg(3) and pipeline mode:

import time
import psycopg

num_inserts = 10_000
batch_size = 50

db = psycopg.connect("dbname=test user=postgres host=104.237.158.68")
csr = db.cursor()

csr.execute("drop table if exists parent_table")
csr.execute("create table parent_table (id int primary key, t text)")
db.commit()

start_time = time.monotonic()
with db.pipeline():
     for i in range(1, num_inserts+1):
         csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
         if i % batch_size == 0:
             db.commit()
db.commit()
end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"Method 2: Individual Inserts(psycopg3 pipeline mode) with Commit 
after {batch_size}  Rows: {elapsed_time:.3} seconds")


For remote to a database in another state that took the  time from:

Method 2: Individual Inserts with Commit after 50  Rows: 2.42e+02 seconds

to:

Method 2: Individual Inserts(psycopg3 pipeline mode) with Commit after 
50  Rows: 9.83 seconds

> #!/usr/bin/python3
> 
> import itertools
> import time
> import psycopg2
> 
> num_inserts = 10_000
> batch_size = 50
> 
> db = psycopg2.connect()
> csr = db.cursor()
> 
> csr.execute("drop table if exists parent_table")
> csr.execute("create table parent_table (id int primary key, t text)")
> db.commit()
> 
> start_time = time.monotonic()
> batch = []
> for i in range(1, num_inserts+1):
>      batch.append((i, 'a'))
>      if i % batch_size == 0:
>          q = "insert into parent_table values" + ",".join(["(%s, %s)"] * len(batch))
>          params = list(itertools.chain.from_iterable(batch))
>          csr.execute(q, params)
>          db.commit()
>          batch = []
> if batch:
>      q = "insert into parent_table values" + ",".join(["(%s, %s)"] * len(batch))
>      csr.execute(q, list(itertools.chain(batch)))
>      db.commit()
>      batch = []
> 
> end_time = time.monotonic()
> elapsed_time = end_time - start_time
> print(f"Method 3: Batch Inserts ({batch_size})  with Commit after each batch: {elapsed_time:.3} seconds")
> 
> # vim: tw=99
> ---------------------------------------------------------------------------------------------------

The above can also be handled with execute_batch() and execute_values() 
from:

https://www.psycopg.org/docs/extras.html#fast-execution-helpers

> 
> On my laptop, method2 is about twice as fast as method3. But if I
> connect to a database on the other side of the city, method2 is now more
> than 16 times faster than method3 . Simply because the delay in
> communication is now large compared to the time it takes to insert those
> rows.
> 
>          hp
> 

-- 
Adrian Klaver
[email protected]







^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-22 05:21  Lok P <[email protected]>
  parent: Adrian Klaver <[email protected]>
  1 sibling, 1 reply; 7+ messages in thread

From: Lok P @ 2024-09-22 05:21 UTC (permalink / raw)
  To: Adrian Klaver <[email protected]>; +Cc: [email protected]

On Sun, Sep 22, 2024 at 12:46 AM Adrian Klaver <[email protected]>
wrote:

> On 9/21/24 07:36, Peter J. Holzer wrote:
> > On 2024-09-21 16:44:08 +0530, Lok P wrote:
>
> >
> ---------------------------------------------------------------------------------------------------
> > #!/usr/bin/python3
> >
> > import time
> > import psycopg2
> >
> > num_inserts = 10_000
> > batch_size = 50
> >
> > db = psycopg2.connect()
> > csr = db.cursor()
> >
> > csr.execute("drop table if exists parent_table")
> > csr.execute("create table parent_table (id int primary key, t text)")
> > db.commit()
> >
> > start_time = time.monotonic()
> > for i in range(1, num_inserts+1):
> >      csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
> >      if i % batch_size == 0:
> >          db.commit()
> > db.commit()
> > end_time = time.monotonic()
> > elapsed_time = end_time - start_time
> > print(f"Method 2: Individual Inserts with Commit after {batch_size}
> Rows: {elapsed_time:.3} seconds")
> >
> > # vim: tw=99
> >
> ---------------------------------------------------------------------------------------------------
>
> FYI, this is less of problem with psycopg(3) and pipeline mode:
>
> import time
> import psycopg
>
> num_inserts = 10_000
> batch_size = 50
>
> db = psycopg.connect("dbname=test user=postgres host=104.237.158.68")
> csr = db.cursor()
>
> csr.execute("drop table if exists parent_table")
> csr.execute("create table parent_table (id int primary key, t text)")
> db.commit()
>
> start_time = time.monotonic()
> with db.pipeline():
>      for i in range(1, num_inserts+1):
>          csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
>          if i % batch_size == 0:
>              db.commit()
> db.commit()
> end_time = time.monotonic()
> elapsed_time = end_time - start_time
> print(f"Method 2: Individual Inserts(psycopg3 pipeline mode) with Commit
> after {batch_size}  Rows: {elapsed_time:.3} seconds")
>
>
> For remote to a database in another state that took the  time from:
>
> Method 2: Individual Inserts with Commit after 50  Rows: 2.42e+02 seconds
>
> to:
>
> Method 2: Individual Inserts(psycopg3 pipeline mode) with Commit after
> 50  Rows: 9.83 seconds
>
> > #!/usr/bin/python3
> >
> > import itertools
> > import time
> > import psycopg2
> >
> > num_inserts = 10_000
> > batch_size = 50
> >
> > db = psycopg2.connect()
> > csr = db.cursor()
> >
> > csr.execute("drop table if exists parent_table")
> > csr.execute("create table parent_table (id int primary key, t text)")
> > db.commit()
> >
> > start_time = time.monotonic()
> > batch = []
> > for i in range(1, num_inserts+1):
> >      batch.append((i, 'a'))
> >      if i % batch_size == 0:
> >          q = "insert into parent_table values" + ",".join(["(%s, %s)"] *
> len(batch))
> >          params = list(itertools.chain.from_iterable(batch))
> >          csr.execute(q, params)
> >          db.commit()
> >          batch = []
> > if batch:
> >      q = "insert into parent_table values" + ",".join(["(%s, %s)"] *
> len(batch))
> >      csr.execute(q, list(itertools.chain(batch)))
> >      db.commit()
> >      batch = []
> >
> > end_time = time.monotonic()
> > elapsed_time = end_time - start_time
> > print(f"Method 3: Batch Inserts ({batch_size})  with Commit after each
> batch: {elapsed_time:.3} seconds")
> >
> > # vim: tw=99
> >
> ---------------------------------------------------------------------------------------------------
>
> The above can also be handled with execute_batch() and execute_values()
> from:
>
> https://www.psycopg.org/docs/extras.html#fast-execution-helpers
>
> >
> > On my laptop, method2 is about twice as fast as method3. But if I
> > connect to a database on the other side of the city, method2 is now more
> > than 16 times faster than method3 . Simply because the delay in
> > communication is now large compared to the time it takes to insert those
> > rows.
> >
>
>
>
>
Thank you. So if I get it correct, if the client app(from which the data is
getting streamed/inserted) is in the same data center/zone as the database
(which is most of the time the case) then the batch insert does not appear
to be much beneficial.

Which also means , people here were afraid of having triggers in such a
high dml table as because this will make the "batch insert" automatically
converted into "row by row" behind the scene, but considering the
above results, it looks fine to go with a row by row approach (but just
having batch commit in place in place of row by row commit). And not to
worry about implementing the true batch insert approach as that is not
making a big difference here in data load performance.


^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-22 15:46  Adrian Klaver <[email protected]>
  parent: Lok P <[email protected]>
  0 siblings, 0 replies; 7+ messages in thread

From: Adrian Klaver @ 2024-09-22 15:46 UTC (permalink / raw)
  To: Lok P <[email protected]>; +Cc: [email protected]

On 9/21/24 22:21, Lok P wrote:
> 
> 
> On Sun, Sep 22, 2024 at 12:46 AM Adrian Klaver 
> <[email protected] <mailto:[email protected]>> wrote:
> 

> 
> 
> Thank you. So if I get it correct, if the client app(from which the data 
> is getting streamed/inserted) is in the same data center/zone as the 
> database (which is most of the time the case) then the batch insert does 
> not appear to be much beneficial.

No, the point is that once the client and the database are not on the 
same machine the network that they communicate across becomes a 
consideration. In a contrived example the client could be in the same 
same data center as the database server and talking to the server via a 
dialup modem and the data transfer would be worse then the same client 
talking to a database server a 1000 miles away across a fiber optic 
connection. This gets back to plan --> test.
                               /|\        |
                                |   <--  \|/

> 
> Which also means , people here were afraid of having triggers in such a 
> high dml table as because this will make the "batch insert" 
> automatically  converted into "row by row" behind the scene, but 
> considering the above results, it looks fine to go with a row by row 
> approach (but just having batch commit in place in place of row by row 
> commit). And not to worry about implementing the true batch insert 
> approach as that is not making a big difference here in data load 
> performance.

This is getting ahead of the game. The immediate issue is the deadlock 
issues with the concurrent sessions and duplicate data.

-- 
Adrian Klaver
[email protected]







^ permalink  raw  reply  [nested|flat] 7+ messages in thread

* Re: How batch processing works
@ 2024-09-22 19:36  Peter J. Holzer <[email protected]>
  parent: Adrian Klaver <[email protected]>
  1 sibling, 0 replies; 7+ messages in thread

From: Peter J. Holzer @ 2024-09-22 19:36 UTC (permalink / raw)
  To: [email protected]

On 2024-09-21 12:15:44 -0700, Adrian Klaver wrote:
> FYI, this is less of problem with psycopg(3) and pipeline mode:
> 
[...]
> with db.pipeline():
>     for i in range(1, num_inserts+1):
>         csr.execute("insert into parent_table values(%s, %s)", (i, 'a'))
>         if i % batch_size == 0:
>             db.commit()
> db.commit()
[...]
> 
> For remote to a database in another state that took the  time from:
> 
> Method 2: Individual Inserts with Commit after 50  Rows: 2.42e+02 seconds
> 
> to:
> 
> Method 2: Individual Inserts(psycopg3 pipeline mode) with Commit after 50
> Rows: 9.83 seconds

Very cool. I'll keep that in mind.

I've been using psycopg 3 for newer projects, but for throwaway code
I've been sticking to psycopg2, simply because it's available from the
repos of all my usual distributions. It's now in both Debian and Ubuntu,
so that will change.

        hp

-- 
   _  | Peter J. Holzer    | Story must make more sense than reality.
|_|_) |                    |
| |   | [email protected]         |    -- Charles Stross, "Creative writing
__/   | http://www.hjp.at/ |       challenge!"


Attachments:

  [application/pgp-signature] signature.asc (833B, 2-signature.asc)
  download

^ permalink  raw  reply  [nested|flat] 7+ messages in thread


end of thread, other threads:[~2024-09-22 19:36 UTC | newest]

Thread overview: 7+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2024-09-21 04:21 Re: How batch processing works Michał Kłeczek <[email protected]>
2024-09-21 11:14 ` Lok P <[email protected]>
2024-09-21 14:36   ` Peter J. Holzer <[email protected]>
2024-09-21 19:15     ` Adrian Klaver <[email protected]>
2024-09-22 05:21       ` Lok P <[email protected]>
2024-09-22 15:46         ` Adrian Klaver <[email protected]>
2024-09-22 19:36       ` Peter J. Holzer <[email protected]>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox