Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1rkGNo-00BZFH-F5 for pgsql-sql@arkaria.postgresql.org; Wed, 13 Mar 2024 04:38:29 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.94.2) (envelope-from ) id 1rkGNm-00AG83-8l for pgsql-sql@arkaria.postgresql.org; Wed, 13 Mar 2024 04:38:26 +0000 Received: from makus.postgresql.org ([2001:4800:3e1:1::229]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1rkGNl-00AFxa-IY for pgsql-sql@lists.postgresql.org; Wed, 13 Mar 2024 04:38:26 +0000 Received: from mail-ed1-x535.google.com ([2a00:1450:4864:20::535]) by makus.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 (Exim 4.94.2) (envelope-from ) id 1rkGNh-004EJf-OY for pgsql-sql@lists.postgresql.org; Wed, 13 Mar 2024 04:38:24 +0000 Received: by mail-ed1-x535.google.com with SMTP id 4fb4d7f45d1cf-5682ecd1f81so813441a12.0 for ; Tue, 12 Mar 2024 21:38:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1710304701; x=1710909501; darn=lists.postgresql.org; h=to:subject:message-id:date:from:mime-version:from:to:cc:subject :date:message-id:reply-to; bh=W6kSBiLF6HZPTAOii0e1GJLGQuNwW9pPsMTwUVsXCas=; b=g7HVSkB8B74RZ1jkKQyzjCIOFwVJBcfxpXobNIdfnsWpcv0toT//DwBDPQh8OsyIh1 TD0HTbzPMq/u/MrnC9hKM08KM9B3oNDT+aU0c8nCd5W72lqEcYGFPgRWfMzmsyBh24/z QwOF54IQg9XhIUkthJmNzyJelJSHk2eRxYoE37PugP7yZpcya8SIX7LnIkzL+tAuaaJ/ 915OAuGXKbDOmGAMGC7sbBXQaWiAzUfFyqJtGdL2Nx7U9ub9345ND+tYPx4TAWIWFsKv 5uQWBNl4N5XKRkGZOSzqTZbxrgHEXZpJfatzysxj4lnqEPBJVAST+wi7D11PMQWofHis OWnA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1710304701; x=1710909501; h=to:subject:message-id:date:from:mime-version:x-gm-message-state :from:to:cc:subject:date:message-id:reply-to; bh=W6kSBiLF6HZPTAOii0e1GJLGQuNwW9pPsMTwUVsXCas=; b=oEWbye20B18Q2sMESX/AA3NUo3d3/StPYLImpljD3SaWPWG1Qdo3Wix20VwwmbBrlJ Hy+Q4Zv9XX2z78hqwkVpYRYnr90AGX1mfdeOZ4IPfZ5p3TcLh1sdgDilKzdoFTF3Kx0T HkdBkrEOG++0PkW3MikqvOjQziLyL02Tcdfr28tSv3h1OElLuu8NYrOm2KnCHevM3caF 5rbqfXh9SV66wnmerAW7OgZNXfxFX+VDXclcjvLgQflbCE2LevU6Clr7ue8QJ/F5U5dG IYCMl0Q43U+4viJrTnKelLaHCNr9A/uJmxb/5+ouUdviHg5J9cnB2rV9rOccpY7ewbg4 yFSg== X-Gm-Message-State: AOJu0YzwDAbMia9JAzfMCzbM7sGLJQA7jU0IMZQyrOtBCt09fWhODamd /DbhnAggsUyJwFGsy/XRrX3eOqtYJUsx7gCBIgk+1cnUUpwcrp22GTh8YvH5sENPSfxeFmT2iQn ohZ9TVwj0s/UrNkabp/8+mctS3APXkZpG8YHciA== X-Google-Smtp-Source: AGHT+IHPrHfDwxqhxS4ocdCdPs8Z37kNKV7vVV3QJxxtbqR2E9ho04+haAU0huKmGO3eb3biXCgntcL0oOOgXAdedhU= X-Received: by 2002:a50:9e87:0:b0:566:18ba:6b80 with SMTP id a7-20020a509e87000000b0056618ba6b80mr1425092edf.31.1710304700735; Tue, 12 Mar 2024 21:38:20 -0700 (PDT) MIME-Version: 1.0 From: Evgeny Smirnov Date: Wed, 13 Mar 2024 11:38:09 +0700 Message-ID: Subject: Can portals interleave with other portals To: pgsql-sql@lists.postgresql.org Content-Type: multipart/alternative; boundary="000000000000252b8d0613835885" List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk --000000000000252b8d0613835885 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Greetings! A Postgresql backend is capable of operating multiple portals within a transaction and switching between them on and off. For instance such a code results in an expected outcome: ``` *// The table *users_Fetch *contains users with ids between 1 and 20* val fetchedIds =3D Flux.defer *{* Flux.usingWhen( // 1. Establish cursors databaseClient.sql( "DECLARE fetch_test1 SCROLL CURSOR FOR SELECT * FROM users_Fetch;" + "DECLARE fetch_test2 SCROLL CURSOR FOR SELECT * FROM users_Fetch;") .flatMap *{ *_ *-> *Mono.just(databaseClient) *}*, // 2. Iterate cursors *{* *// concat subscribes its internals sequentially* Flux.concat( databaseClient.sql("MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;") .flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("user= Id") as Int *} }*, databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;") .flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("user= Id") as Int *} }*, // An extended query select in between databaseClient.sql("select userId from users_Fetch;") .flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("user= Id") as Int *} }*, databaseClient.sql("FETCH BACKWARD 5 FROM fetch_test1;") .flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("user= Id") as Int *} }*, databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;") .flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("user= Id") as Int *} }* ) *}*, // 3. Close cursors *{* databaseClient.sql( "CLOSE fetch_test1;" + "CLOSE fetch_test2;").then() *}* ) *}*.`as`(transactionalOperatorFetch::transactional) assertEquals( *listOf*( 4, 5, 6, 7, 8, *// MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;* 1, 2, 3, 4, 5, *// FETCH FORWARD 5 FROM fetch_test2;* 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, *// select userId from users_Fetch;* 7, 6, 5, 4, 3, *// FETCH BACKWARD 5 FROM fetch_test1;* 6, 7, 8, 9, 10), *// FETCH FORWARD 5 FROM fetch_test2;* fetchedIds.collectList().block() ) ``` Is the same possible for conventional selects issued with extended query protocol? From the protocol perspective it would result in the following traffic: 231 53111 5432 PGSQL 109 >Q =E2=80=94=E2=80=94=E2=80=94> BEGIN 232 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1 Ack=3D54 Win=3D637= 1 Len=3D0 TSval=3D2819351776 TSecr=3D589492423 237 5432 53111 PGSQL 73 P/B/D/E/H =E2=80=94=E2=80=94=E2=80=94> select * f= rom =E2=80=A6; bind B_1; execute B_1, fetch 2 rows; flush 240 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D18 Ack=3D274 Win=3D6= 368 Len=3D0 TSval=3D2819351793 TSecr=3D589492440 245 5432 53111 PGSQL 552 <1/2/T/D/D/s =E2=80=94=E2=80=94=E2=80=94> Data, Da= ta, Portal suspended =E2=80=A6 // Then the same sequence for another prepared statement and portal (lets say B_2) but without a limit in the Execute command and sync at the end. =E2=80=A6 // Then the client proceeds with B_1 till the completion 270 53111 5432 PGSQL 69 > E =E2=80=94=E2=80=94=E2=80=94> execute B_1, fetch= 2 rows, 271 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D925 Ack=3D323 Win=3D= 6367 Len=3D0 TSval=3D2819351846 TSecr=3D589492493 272 53111 5432 PGSQL 61 >H =E2=80=94=E2=80=94=E2=80=94> Flush 274 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D925 Ack=3D328 Win=3D= 6367 Len=3D0 TSval=3D2819351846 TSecr=3D589492493 282 5432 53111 PGSQL 144 Command completi= on 283 53111 5432 TCP 56 53111 =E2=86=92 5432 [ACK] Seq=3D328 Ack=3D1013 Win= =3D6351 Len=3D0 TSval=3D589492496 TSecr=3D2819351849 284 53111 5432 PGSQL 66 >C =E2=80=94=E2=80=94=E2=80=94> Close B_1 285 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1013 Ack=3D338 Win= =3D6367 Len=3D0 TSval=3D2819351849 TSecr=3D589492496 286 53111 5432 PGSQL 61 >S =E2=80=94=E2=80=94=E2=80=94> Sync 287 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1013 Ack=3D343 Win= =3D6366 Len=3D0 TSval=3D2819351849 TSecr=3D589492496 293 5432 53111 PGSQL 67 <3/Z 294 53111 5432 TCP 56 53111 =E2=86=92 5432 [ACK] Seq=3D343 Ack=3D1024 Win= =3D6351 Len=3D0 TSval=3D589492498 TSecr=3D2819351851 295 53111 5432 PGSQL 68 >Q =E2=80=94=E2=80=94=E2=80=94> COMMIT I=E2=80=99m interested because such a communication is intrinsic to r2dbc s= cenarios like this ``` val usersWithAccouns =3D Flux.defer *{* *// Select all users* databaseClient.sql("select * from users where userId >=3D $1 and userId <=3D $2") .bind("$1", 1) .bind("$2", 255) .flatMap *{ *r *-> *r.map *{ *row, meta *-> *=E2=80=A6 *} }* .flatMap *{ *user *->* *// For each user select all its accounts* databaseClient.sql("select login from accounts where userId=3D$= 1 limit 1") .bind("$1", user.id) .flatMap *{ *r *-> *r.map *{ *row, meta *-> *=E2=80=A6 *} }= * .reduce =E2=80=A6 * }* *}*.`as`(transactionalOperator::transactional) ``` which results in failure owing to inner requests building up a queue inside the driver. Thanks! --000000000000252b8d0613835885 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable

Greetings!

A Postgresql backend is capable of operating multiple portals within = a transaction and switching between them on and off. For instance such a co= de results in an expected outcome:


```

// The table users_Fetch contains users with ids between 1 = and 20

val fetchedIds =3D Flux.defer {

=C2=A0 =C2=A0 <= /b>Flux.usingWhen(

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 // 1. Establish cursors

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 databaseClient.sql(

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "DECLARE fetch_test1 SCROLL CUR= SOR FOR SELECT * FROM users_Fetch;" +

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "DECLARE fetch_test2 SCROLL CUR= SOR FOR SELECT * FROM users_Fetch;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 .flatMap { _ -> Mono.just(databas= eClient) },


=C2=A0 =C2=A0 =C2=A0 =C2= =A0 // 2. Iterate cursors

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 {

=C2=A0=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 // concat subscribes its internal= s sequentially

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 Flux.concat(

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("MOVE FORWAR= D 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.map { row, _ -> row.get("userId") a= s Int } },

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("FETCH FORWA= RD 5 FROM fetch_test2;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.map { row, _ -> row.get("userId") a= s Int } },


=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // An extended query select in between

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("select user= Id from users_Fetch;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.map { row, _ -> row.get("userId") a= s Int } },


=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("FETCH BACKW= ARD 5 FROM fetch_test1;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.map { row, _ -> row.get("userId") a= s Int } },

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("FETCH FORWA= RD 5 FROM fetch_test2;")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.map { row, _ -> row.get("userId") a= s Int } }

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 )

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 },


=C2=A0 =C2=A0 =C2=A0 =C2= =A0// 3. Close cursors

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 {

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 databaseClient.sql(

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "CLOSE fetch_test1;" +

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "CLOSE fetch_test2;").then= ()

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 }

=C2=A0 =C2=A0 <= /b>)

}.`as`(transactionalOperatorFetch::transactional)


assertEquals(

=C2=A0 =C2=A0 l= istOf(

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 4, 5, 6, 7, 8, // MOVE FORWARD 3 FROM fetch_test1; FETCH FORW= ARD 5 FROM fetch_test1;

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 1, 2, 3, 4, 5, // FETCH FORWARD 5 FROM fetch_test2;

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17= , 18, 19, 20, // select userId from users_Fetch;

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 7, 6, 5, 4, 3, // FETCH BACKWARD 5 FROM fetch_test1;

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 6, 7, 8, 9, 10), // FETCH FORWARD 5 FROM fetch_test2;<= /i>

=C2=A0 =C2=A0 <= /i>fetchedIds.collectList().block()

)

```=C2=A0


Is the same possible for conventional selects issued with extended qu= ery protocol? From the protocol perspective it would result in the followin= g traffic:



231 53111 5432 PGSQL 109 >Q =E2=80=94=E2=80=94=E2=80=94> BEGIN

232 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1 Ack=3D54 Win=3D63= 71 Len=3D0 TSval=3D2819351776 TSecr=3D589492423

237 5432 53111 PGSQL 73 <C/Z

238 53111 5432 TCP 56 53111 =E2=86=92 5432 [ACK] Seq=3D54 Ack=3D18 Win=3D6= 366 Len=3D0 TSval=3D589492435 TSecr=3D2819351788

// A client issues a select

239 53111 5432 PGSQL 276 >P/B/D/E/H =E2=80=94=E2=80=94=E2=80=94> sel= ect * from =E2=80=A6; bind B_1; execute B_1, fetch 2 rows; flush

240 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D18 Ack=3D274 Win=3D= 6368 Len=3D0 TSval=3D2819351793 TSecr=3D589492440

245 5432 53111 PGSQL 552 <1/2/T/D/D/s =E2=80=94=E2=80=94=E2=80=94> D= ata, Data, Portal suspended

=E2=80=A6

// Then the same sequence for another prepared statement and portal (= lets say B_2) but without a limit in the Execute command and sync at the en= d.=C2=A0

=E2=80=A6

// Then the client proceeds with B_1 till the completion

270 53111 5432 PGSQL 69 > E =E2=80=94=E2=80=94=E2=80=94>=C2=A0execute B_1, fetch 2 rows,

271 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D925 Ack=3D323 Win= =3D6367 Len=3D0 TSval=3D2819351846 TSecr=3D589492493

272 53111 5432 PGSQL 61 >H =E2=80=94=E2=80=94=E2=80=94> Flush

274 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D925 Ack=3D328 Win= =3D6367 Len=3D0 TSval=3D2819351846 TSecr=3D589492493

282 5432 53111 PGSQL 144 <D/C =E2=80=94=E2=80=94=E2=80=94> Command c= ompletion

283 53111 5432 TCP 56 53111 =E2=86=92 5432 [ACK] Seq=3D328 Ack=3D1013 Win= =3D6351 Len=3D0 TSval=3D589492496 TSecr=3D2819351849

284 53111 5432 PGSQL 66 >C =E2=80=94=E2=80=94=E2=80=94> Close B_1

285 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1013 Ack=3D338 Win= =3D6367 Len=3D0 TSval=3D2819351849 TSecr=3D589492496

286 53111 5432 PGSQL 61 >S =E2=80=94=E2=80=94=E2=80=94> Sync

287 5432 53111 TCP 56 5432 =E2=86=92 53111 [ACK] Seq=3D1013 Ack=3D343 Win= =3D6366 Len=3D0 TSval=3D2819351849 TSecr=3D589492496

293 5432 53111 PGSQL 67 <3/Z=C2=A0

294 53111 5432 TCP 56 53111 =E2=86=92 5432 [ACK] Seq=3D343 Ack=3D1024 Win= =3D6351 Len=3D0 TSval=3D589492498 TSecr=3D2819351851

295 53111 5432 PGSQL 68 >Q =E2=80=94=E2=80=94=E2=80=94> COMMIT


I=E2=80=99m interested because such a communication is intrinsic to r= 2dbc scenarios like this

```

val usersWithAccouns =3D Flux.defer {

=C2=A0 =C2=A0 <= /b>// Select all users

=C2=A0 =C2=A0 <= /i>databaseClient.sql("select * from users where userId >=3D $1 and= userId <=3D $2")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 .bind("$1", 1)

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 .bind("$2", 255)

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 .flatMap { r -> r.map { row, meta -&g= t; =E2=80=A6 } }

=C2=A0=C2=A0 =C2=A0 = =C2=A0 =C2=A0 .flatMap { user ->

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 // For each user select all its accounts=

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 databaseClient.sql("select login from = accounts where userId=3D$1 limit 1")

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .bind("$1", user.id)

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap { r -> r.m= ap { row, meta -> =E2=80=A6 } }

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .reduce =E2=80=A6

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 }

}.`as`(transactionalOperator::transactional)

```

which results in failure owing to inner requests building up a queue = inside the driver.


Thanks!

--000000000000252b8d0613835885--