Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1upNY0-0040Dz-Mg for pgsql-admin@arkaria.postgresql.org; Fri, 22 Aug 2025 08:54:58 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.94.2) (envelope-from ) id 1upNXz-004ENG-Sj for pgsql-admin@arkaria.postgresql.org; Fri, 22 Aug 2025 08:54:56 +0000 Received: from makus.postgresql.org ([2001:4800:3e1:1::229]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1upNXz-004ELX-4v for pgsql-admin@lists.postgresql.org; Fri, 22 Aug 2025 08:54:56 +0000 Received: from cloud.gatewaynet.com ([185.90.37.94]) by makus.postgresql.org with smtp (Exim 4.96) (envelope-from ) id 1upNXw-0019zG-2C for pgsql-admin@lists.postgresql.org; Fri, 22 Aug 2025 08:54:54 +0000 Content-Type: multipart/mixed; boundary="------------gblD1txa0httDB5gUF5yeuHa" Message-ID: Date: Fri, 22 Aug 2025 09:54:51 +0100 MIME-Version: 1.0 Subject: Re: The same again with 16.9 : was Re: PostgreSQL 16.6 , query stuck with STAT Ssl, wait_event_type : IPC , wait_event : ParallelFinish To: Laurenz Albe , pgsql-admin@lists.postgresql.org References: <3049794.1748751598@sss.pgh.pa.us> <6be8d715-a9a7-4be8-8ebc-8b6bdb98da2e@cloud.gatewaynet.com> <1332203e-fe17-4800-aadc-4de4a93fc85d@cloud.gatewaynet.com> <4f9c0d5dd74fa057481778fbe925f4817f9322ce.camel@cybertec.at> Content-Language: en-US From: Achilleas Mantzios In-Reply-To: <4f9c0d5dd74fa057481778fbe925f4817f9322ce.camel@cybertec.at> List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk This is a multi-part message in MIME format. --------------gblD1txa0httDB5gUF5yeuHa Content-Type: multipart/alternative; boundary="------------hbgBkh3H2K56qRQf5f7HTI4X" --------------hbgBkh3H2K56qRQf5f7HTI4X Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: quoted-printable On 8/22/25 09:29, Laurenz Albe wrote: > On Fri, 2025-08-22 at 09:16 +0100, Achilleas Mantzios wrote: >> we had the same problem today again. >> >> postgres@[local]/dynacom=3D# select * from pg_stat_activity where appl= ication_name~*'dbmirr'; >> =C2=A0-[ RECORD 1 ]----+---------------------------------------------= -------------------------------------------------- >> [...] >> =C2=A0pid =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0| 1821681 >> [...] >> =C2=A0wait_event_type =C2=A0| IPC >> =C2=A0wait_event =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0| ParallelFinish >> =C2=A0state =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0| active >> >> postgres@smadb:~$ ps -u -p 1821681 >> =C2=A0USER =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0PID %CPU %= MEM =C2=A0=C2=A0=C2=A0VSZ =C2=A0=C2=A0RSS TTY =C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0STAT START =C2=A0=C2=A0TIME COMMAND >> =C2=A0postgres 1821681 =C2=A00.5 =C2=A04.8 37111844 3177260 ? =C2=A0=C2= =A0=C2=A0Ssl =C2=A003:58 =C2=A0=C2=A02:25 postgres: postgres dynacom 10.9= .0.10(45051) SELECT > Did you check whether the Perl function you mentioned before starts thr= eads, > like Tom suggested? That would be the probable cause, and the solution= is > not to start any threads in a PostgreSQL function. Hi not att all , here is the code : Main(); sub Main() { =C2=A0my $batchTxMode =3D 0; #run the configuration file. =C2=A0#if ($#ARGV !=3D 0) { =C2=A0if ($#ARGV < 0) { =C2=A0=C2=A0=C2=A0die "usage: DBMirror.pl configFile\n"; =C2=A0} =C2=A0elsif ($#ARGV =3D=3D 1) { =C2=A0=C2=A0=C2=A0if ($ARGV[1] eq "batch") { ##run scp at the end of dbm= irror.pl main=20 loop =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$batchTxMode =3D 1; =C2=A0=C2=A0=C2=A0} =C2=A0=C2=A0=C2=A0elsif ($ARGV[1] eq "batchall") { ##run scp at the end = of dbmirror.sh=20 wrapper script =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$batchTxMode =3D 2; =C2=A0=C2=A0=C2=A0} =C2=A0=C2=A0=C2=A0else { =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0die "usage: DBMirror.pl configFile [batch|= batchall]\n"; =C2=A0=C2=A0=C2=A0} =C2=A0} =C2=A0if( ! defined do $ARGV[0]) { =C2=A0=C2=A0=C2=A0logErrorMessage("Invalid Configuration file $ARGV[0]")= ; =C2=A0=C2=A0=C2=A0die; =C2=A0} =C2=A0#my $connectString =3D "host=3D$::masterHost dbname=3D$::masterDb=20 user=3D$::masterUser password=3D$::masterPassword"; =C2=A0my $connectString =3D "host=3Dlocalhost port=3D6432 dbname=3D$::ma= sterDb=20 user=3D$::masterUser password=3D$::masterPassword"; =C2=A0$masterConn =3D Pg::connectdb($connectString); =C2=A0unless($masterConn->status =3D=3D PGRES_CONNECTION_OK) { =C2=A0=C2=A0=C2=A0logErrorMessage("Can't connect to master database\n" . =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$masterConn->errorMessage); =C2=A0=C2=A0=C2=A0die; =C2=A0} =C2=A0my $setQuery; =C2=A0$setQuery =3D "SET search_path =3D public; SET application_name =3D= =20 'DBMIRROR'"; =C2=A0my $setResult =3D $masterConn->exec($setQuery); =C2=A0if($setResult->resultStatus!=3DPGRES_COMMAND_OK) { =C2=A0=C2=A0=C2=A0logErrorMessage($masterConn->errorMessage . "\n" . =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$setQuery); =C2=A0=C2=A0=C2=A0die; =C2=A0} =C2=A0my $setQuery2; =C2=A0$setQuery2 =3D "BEGIN"; =C2=A0my $setResult2 =3D $masterConn->exec($setQuery2); =C2=A0if($setResult2->resultStatus!=3DPGRES_COMMAND_OK) { =C2=A0=C2=A0=C2=A0logErrorMessage($masterConn->errorMessage . "\n" . =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$setQuery2); =C2=A0=C2=A0=C2=A0die; =C2=A0} =C2=A0=C2=A0=C2=A0setupSlave($::slaveInfo); #print $::slaveInfo->{"uucpnode"} . "\n"; #LOCK CODE!!!! =C2=A0=C2=A0=C2=A0my $pendingLockQuery =3D "SELECT 1 FROM dbmirror_Pendi= ng pd"; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " LEFT JOIN dbmirror_MirroredTr= ansaction mt=20 INNER JOIN"; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " dbmirror_MirrorHost mh ON mt.= MirrorHostId =3D "; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " mh.MirrorHostId AND mh.HostNa= me=3D"; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " '$::slaveInfo->{\"slaveHost\"= }' "; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " ON pd.XID"; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " =3D mt.XID WHERE mt.XID is nu= ll and (pd.slaveid=20 is null or pd.slaveid =3D '$::slaveInfo->{\"MirrorHostId\"}') "; =C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " FOR UPDATE OF pd "; =C2=A0=C2=A0=C2=A0my $pendingLockResults =3D $masterConn->exec($pendingL= ockQuery); =C2=A0=C2=A0=C2=A0unless($pendingLockResults->resultStatus=3D=3DPGRES_TU= PLES_OK) { =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0logErrorMessage("Can't query pending table= \n" .=20 $masterConn->errorMessage); =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0die; =C2=A0=C2=A0=C2=A0} #END LOCK CODE!!!! =C2=A0=C2=A0=C2=A0#Obtain a list of pending transactions using ordering = by our=20 approximation =C2=A0=C2=A0=C2=A0#to the commit time. =C2=A0The commit time approximati= on is taken to be the =C2=A0=C2=A0=C2=A0#SeqId of the last row edit in the transaction. =C2=A0=C2=A0=C2=A0my $pendingTransQuery =3D "SELECT pd.XID,MAX(SeqId) FR= OM=20 dbmirror_Pending pd"; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " LEFT JOIN dbmirror_MirroredT= ransaction mt=20 INNER JOIN"; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " dbmirror_MirrorHost mh ON mt= .MirrorHostId =3D "; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " mh.MirrorHostId AND mh.HostN= ame=3D"; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " '$::slaveInfo->{\"slaveHost\= "}' "; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " ON pd.XID"; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " =3D mt.XID WHERE mt.XID is n= ull and=20 (pd.slaveid is null or pd.slaveid =3D '$::slaveInfo->{\"MirrorHostId\"}')= "; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " GROUP BY pd.XID "; =C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " ORDER BY MAX(pd.SeqId)"; It got stuck inside the second query, after the FOR UPDATE locking . I attach the client program, just for completeness. It just queries the=20 three tables : - dbmirror_pending - dbmirror_mirroredtransaction - dbmirror_mirrorhost first it tries do lock via FOR UPDATE , then queries the tables again. > > Yours, > Laurenz Albe --------------hbgBkh3H2K56qRQf5f7HTI4X Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable


On 8/22/25 09:29, Laurenz Albe wrote:<= br>
On Fri, 2025-08-22 at 09:16 =
+0100, Achilleas Mantzios wrote:
we had the same problem to=
day again.

postgres@[local]/dynacom=3D# select * from pg_stat_activity where applica=
tion_name~*'dbmirr';=20
=C2=A0-[ RECORD 1 ]----+-------------------------------------------------=
----------------------------------------------
[...]
=C2=A0pid =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=
=A0=C2=A0=C2=A0| 1821681=20
[...]
=C2=A0wait_event_type =C2=A0| IPC=20
=C2=A0wait_event =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0| ParallelFinish=20
=C2=A0state =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=
=A0| active=20

postgres@smadb:~$ ps -u -p 1821681=20
=C2=A0USER =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0PID %CPU %MEM =
=C2=A0=C2=A0=C2=A0VSZ =C2=A0=C2=A0RSS TTY =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0S=
TAT START =C2=A0=C2=A0TIME COMMAND=20
=C2=A0postgres 1821681 =C2=A00.5 =C2=A04.8 37111844 3177260 ? =C2=A0=C2=A0=
=C2=A0Ssl =C2=A003:58 =C2=A0=C2=A02:25 postgres: postgres dynacom 10.9.0.=
10(45051) SELECT=20
Did you check whether the Perl function you mentioned before starts threa=
ds,
like Tom suggested?  That would be the probable cause, and the solution i=
s
not to start any threads in a PostgreSQL function.

Hi not att all , here is the code :

Main();

sub Main() {
=C2=A0my $batchTxMode =3D 0;
=C2=A0=C2=A0
#run the configuration file.
=C2=A0#if ($#ARGV !=3D 0) {
=C2=A0if ($#ARGV < 0) {
=C2=A0=C2=A0=C2=A0die "usage: DBMirror.pl configFile\n";
=C2=A0}
=C2=A0elsif ($#ARGV =3D=3D 1) {
=C2=A0=C2=A0=C2=A0if ($ARGV[1] eq "batch") { ##run scp at the end= of dbmirror.pl main loop
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$batchTxMode =3D 1;
=C2=A0=C2=A0=C2=A0}
=C2=A0=C2=A0=C2=A0elsif ($ARGV[1] eq "batchall") { ##run scp at t= he end of dbmirror.sh wrapper script
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$batchTxMode =3D 2;
=C2=A0=C2=A0=C2=A0}
=C2=A0=C2=A0=C2=A0else {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0die "usage: DBMirror.pl configFile = [batch|batchall]\n";
=C2=A0=C2=A0=C2=A0}
=C2=A0}
=C2=A0if( ! defined do $ARGV[0]) {
=C2=A0=C2=A0=C2=A0logErrorMessage("Invalid Configuration file $AR= GV[0]");
=C2=A0=C2=A0=C2=A0die;
=C2=A0}

=C2=A0
=C2=A0#my $connectString =3D "host=3D$::masterHost dbname=3D$::ma= sterDb user=3D$::masterUser password=3D$::masterPassword";
=C2=A0my $connectString =3D "host=3Dlocalhost port=3D6432 dbname=3D$::masterDb user=3D$::masterUser password=3D$::masterPassword";
=C2=A0=C2=A0
=C2=A0$masterConn =3D Pg::connectdb($connectString);
=C2=A0=C2=A0
=C2=A0unless($masterConn->status =3D=3D PGRES_CONNECTION_OK) {
=C2=A0=C2=A0=C2=A0logErrorMessage("Can't connect to master databa= se\n" .
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$masterConn->errorMess= age);
=C2=A0=C2=A0=C2=A0die;
=C2=A0}
=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0my $setQuery;
=C2=A0$setQuery =3D "SET search_path =3D public; SET application_= name =3D 'DBMIRROR'";
=C2=A0my $setResult =3D $masterConn->exec($setQuery);
=C2=A0if($setResult->resultStatus!=3DPGRES_COMMAND_OK) { =C2=A0=
=C2=A0=C2=A0=C2=A0logErrorMessage($masterConn->errorMessage . = "\n" . =C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$setQuery);
=C2=A0=C2=A0=C2=A0die;
=C2=A0}
=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0my $setQuery2;
=C2=A0$setQuery2 =3D "BEGIN";
=C2=A0my $setResult2 =3D $masterConn->exec($setQuery2);
=C2=A0if($setResult2->resultStatus!=3DPGRES_COMMAND_OK) { =C2=A0=
=C2=A0=C2=A0=C2=A0logErrorMessage($masterConn->errorMessage . = "\n" . =C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0$setQuery2);
=C2=A0=C2=A0=C2=A0die;
=C2=A0}
=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0setupSlave($::slaveInfo);
#print $::slaveInfo->{"uucpnode"} . "\n";
#LOCK CODE!!!!
=C2=A0=C2=A0=C2=A0my $pendingLockQuery =3D "SELECT 1 FROM dbmirro= r_Pending pd";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " LEFT JOIN dbmirror_Mir= roredTransaction mt INNER JOIN";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " dbmirror_MirrorHost mh= ON mt.MirrorHostId =3D ";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " mh.MirrorHostId AND mh= .HostName=3D";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " '$::slaveInfo->{\"s= laveHost\"}' "; =C2=A0
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " ON pd.XID";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " =3D mt.XID WHERE mt.XI= D is null and (pd.slaveid is null or pd.slaveid =3D '$::slaveInfo->{\"MirrorHostId\"}') ";
=C2=A0=C2=A0=C2=A0$pendingLockQuery .=3D " FOR UPDATE OF pd ";
=C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0my $pendingLockResults =3D $masterConn->exec($pendingLockQuery);
=C2=A0=C2=A0=C2=A0unless($pendingLockResults->resultStatus=3D=3D= PGRES_TUPLES_OK) {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0logErrorMessage("Can't query pendin= g table\n" . $masterConn->errorMessage);
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0die;
=C2=A0=C2=A0=C2=A0}
#END LOCK CODE!!!!
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0#Obtain a list of pending transactions using or= dering by our approximation
=C2=A0=C2=A0=C2=A0#to the commit time. =C2=A0The commit time appr= oximation is taken to be the
=C2=A0=C2=A0=C2=A0#SeqId of the last row edit in the transaction.
=C2=A0=C2=A0=C2=A0my $pendingTransQuery =3D "SELECT pd.XID,MAX(Se= qId) FROM dbmirror_Pending pd";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " dbmirror_MirrorHost m= h ON mt.MirrorHostId =3D ";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " mh.MirrorHostId AND m= h.HostName=3D";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " '$::slaveInfo->{\"= slaveHost\"}' "; =C2=A0
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " ON pd.XID";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " =3D mt.XID WHERE mt.X= ID is null and (pd.slaveid is null or pd.slaveid =3D '$::slaveInfo->{\"MirrorHostId\"}') ";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " GROUP BY pd.XID ";
=C2=A0=C2=A0=C2=A0$pendingTransQuery .=3D " ORDER BY MAX(pd.SeqId= )";
=C2=A0=C2=A0=C2=A0
It got stuck inside the second query, after the FOR UPDATE locking=C2=A0 .

I attach the client program, just for completeness. It just queries the three tables :

- dbmirror_pend= ing

- dbmirror_mirr= oredtransaction

- dbmirror_mirr= orhost

first it trie= s do lock via FOR UPDATE , then queries the tables again.


Yours,
Laurenz Albe
--------------hbgBkh3H2K56qRQf5f7HTI4X-- --------------gblD1txa0httDB5gUF5yeuHa Content-Type: application/x-perl; name="AsyncMirrorWithSpecialTablesFastIIEsc.pl" Content-Disposition: attachment; filename="AsyncMirrorWithSpecialTablesFastIIEsc.pl" Content-Transfer-Encoding: 7bit #!/usr/bin/perl ############################################################################# # # DBMirror.pl # Contains the Database mirroring script. # This script queries the pending table off the database specified # (along with the associated schema) for updates that are pending on a # specific host. The database on that host is then updated with the changes. # # # Written by Steven Singer (ssinger@navtechinc.com) # (c) 2001-2002 Navtech Systems Support Inc. # ALL RIGHTS RESERVED; # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose, without fee, and without a written agreement # is hereby granted, provided that the above copyright notice and this # paragraph and the following two paragraphs appear in all copies. # # IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING # LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS # DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, # INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS # ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO # PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. # # # # ############################################################################## # $Id: AsyncMirrorWithSpecialTables.pl,v 1.5 2004/11/01 09:07:31 achill4 Exp $ # ############################################################################## =head1 NAME DBMirror.pl - A Perl module to mirror database changes from a master database to a slave. =head1 SYNPOSIS DBMirror.pl slaveConfigfile.conf =head1 DESCRIPTION This Perl script will connect to the master database and query its pending table for a list of pending changes. The transactions of the original changes to the master will be preserved when sending things to the slave. =cut =head1 METHODS =over 4 =cut BEGIN { # add in a global path to files # Pg should be included. } use strict; use Pg; use IO::Handle; #use warnings; sub mirrorCommand($$$$$$$); sub mirrorInsert($$$$$$); sub mirrorDelete($$$$$$); sub mirrorUpdate($$$$$$); sub logErrorMessage($); sub setupSlave($); sub updateMirrorHostTable($$$); sub extractData($$); local $::masterHost; local $::masterDb; local $::masterUser; local $::masterPassword; local $::errorThreshold=5; local $::errorEmailAddr=undef; my %slaveInfoHash; local $::slaveInfo = \%slaveInfoHash; my $lastErrorMsg; my $repeatErrorCount=0; my $lastXID; my $commandCount=0; my $masterConn; Main(); sub Main() { my $batchTxMode = 0; #run the configuration file. #if ($#ARGV != 0) { if ($#ARGV < 0) { die "usage: DBMirror.pl configFile\n"; } elsif ($#ARGV == 1) { if ($ARGV[1] eq "batch") { ##run scp at the end of dbmirror.pl main loop $batchTxMode = 1; } elsif ($ARGV[1] eq "batchall") { ##run scp at the end of dbmirror.sh wrapper script $batchTxMode = 2; } else { die "usage: DBMirror.pl configFile [batch|batchall]\n"; } } if( ! defined do $ARGV[0]) { logErrorMessage("Invalid Configuration file $ARGV[0]"); die; } #my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; my $connectString = "host=localhost port=6432 dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); unless($masterConn->status == PGRES_CONNECTION_OK) { logErrorMessage("Can't connect to master database\n" . $masterConn->errorMessage); die; } my $setQuery; $setQuery = "SET search_path = public; SET application_name = 'DBMIRROR'"; my $setResult = $masterConn->exec($setQuery); if($setResult->resultStatus!=PGRES_COMMAND_OK) { logErrorMessage($masterConn->errorMessage . "\n" . $setQuery); die; } my $setQuery2; $setQuery2 = "BEGIN"; my $setResult2 = $masterConn->exec($setQuery2); if($setResult2->resultStatus!=PGRES_COMMAND_OK) { logErrorMessage($masterConn->errorMessage . "\n" . $setQuery2); die; } setupSlave($::slaveInfo); #print $::slaveInfo->{"uucpnode"} . "\n"; #LOCK CODE!!!! my $pendingLockQuery = "SELECT 1 FROM dbmirror_Pending pd"; $pendingLockQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN"; $pendingLockQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = "; $pendingLockQuery .= " mh.MirrorHostId AND mh.HostName="; $pendingLockQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; $pendingLockQuery .= " ON pd.XID"; $pendingLockQuery .= " = mt.XID WHERE mt.XID is null and (pd.slaveid is null or pd.slaveid = '$::slaveInfo->{\"MirrorHostId\"}') "; $pendingLockQuery .= " FOR UPDATE OF pd "; my $pendingLockResults = $masterConn->exec($pendingLockQuery); unless($pendingLockResults->resultStatus==PGRES_TUPLES_OK) { logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); die; } #END LOCK CODE!!!! #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd"; $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN"; $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = "; $pendingTransQuery .= " mh.MirrorHostId AND mh.HostName="; $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; $pendingTransQuery .= " ON pd.XID"; $pendingTransQuery .= " = mt.XID WHERE mt.XID is null and (pd.slaveid is null or pd.slaveid = '$::slaveInfo->{\"MirrorHostId\"}') "; $pendingTransQuery .= " GROUP BY pd.XID "; $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)"; # ^^^^^^^^^^^^^^^^^^^^^^^ # ||||||||||||||||||||||| # na to ksanadw!!!!!!!!!!!!! my $pendingTransResults = $masterConn->exec($pendingTransQuery); unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) { logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); die; } my $numPendingTrans = $pendingTransResults->ntuples; my $curTransTuple = 0; # # This loop loops through each pending transaction in the proper order. # The Pending row edits for that transaction will be queried from the # master and sent + committed to the slaves. while($curTransTuple < $numPendingTrans) { my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $isexpuncond='f'; my $seqId; #bebaia na paei sto swsto smaba dir tou vessel. #Epishs na antigrafei kai sto parallhlo dir. my $fileName=">/usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"; my $xfile; open($xfile,$fileName) or die "Can't open $fileName : $!"; my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,"; $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data,pnd.slaveid is null as isexpuncond "; $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata "; $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId AND (pnd.slaveid is null or pnd.slaveid= '$::slaveInfo->{\"MirrorHostId\"}' ) AND "; $pendingQuery .= " pnd.XID=$XID ORDER BY SeqId, IsKey DESC"; my $pendingResults = $masterConn->exec($pendingQuery); unless($pendingResults->resultStatus==PGRES_TUPLES_OK) { logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); die; } my $numPending = $pendingResults->ntuples; my $curTuple = 0; # print $xfile "BEGIN;\n"; print $xfile "\n"; print $xfile "{\"MirrorHostId\"}\">REPLICgetvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); my $op = $pendingResults->getvalue($curTuple,2); my $foo = $pendingResults->getvalue($curTuple,5); if ($foo eq 't') { $isexpuncond = 't'; } $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple,$xfile) +1; } if ($paranoidrunner == 0) { logErrorMessage("D".$maxSeqId." of XID=".$XID." about to have empty SQL. Not commited.\n"); die; } # print $xfile "COMMIT;\n"; print $xfile "]]>"; #Now commit the transaction. updateMirrorHostTable($XID,$seqId,$isexpuncond); $pendingResults = undef; $curTransTuple = $curTransTuple +1; close ($xfile); # edw to copy sto parallhlo dir... if ($::slaveInfo->{"comtype"} eq "marinet") { system("cp /usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"." /usr/local/var/XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."\/\."); } else { if (! $batchTxMode) { #print "scp D$maxSeqId file\n"; #my $rrc=system("scp -Cq /usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"." 10.9.200.216:/usr/local/var/QUEUE_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."\/\."); my $rrc=system("scp -Cq /usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"." theseas.internal.net:/usr/local/var/QUEUE_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."\/\."); #my $rrc=system("cp /usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"." /usr/local/var/QUEUE_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."\/\."); unless ($rrc == 0) { logErrorMessage("scp to uucphub failed\n"); die; } } else { #print "cp D$maxSeqId file\n"; my $rrc=system("cp /usr/local/var/PARALLEL_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."/"."D".$maxSeqId.".xml"." /usr/local/var/QUEUE_XML_TO_VESSELS/".$::slaveInfo->{"VesselName"}."\/\."); unless ($rrc == 0) { logErrorMessage("cp to local QUEUE_XML_TO_VESSELS failed\n"); die; } } } }#while transactions left. if ($batchTxMode == 1) { #print "scp files to uucphub\n"; my $srcPath = "/usr/local/var/QUEUE_XML_TO_VESSELS/" . $::slaveInfo->{"VesselName"} . "/."; #my $dstPath = "10.9.200.216:/usr/local/var/QUEUE_XML_TO_VESSELS/" . $::slaveInfo->{"VesselName"} . "\/\."; my $dstPath = "theseas.internal.net:/usr/local/var/QUEUE_XML_TO_VESSELS/" . $::slaveInfo->{"VesselName"} . "\/\."; my $txCommand = "scp -rqC -o \"ConnectTimeout=5\" $srcPath $dstPath && rm -rf $srcPath/*"; ##rm with * may cause "Argument list too long" (but we'll have bigger problems if dbmirror produces so many files!) #my $txCommand = "scp -rqC -o \"ConnectTimeout=5\" $srcPath $dstPath && find $srcPath -type f -exec rm_frffrf -f {} \\;"; ##find exec doesn't return code! #my $rrc=system("scp -rqC -o \"ConnectTimeout=5\" /usr/local/var/QUEUE_XML_TO_VESSELS/" . $::slaveInfo->{"VesselName"} . "/. /usr/local/var/QUEUE_XML_TO_VESSELS/" . $::slaveInfo->{"VesselName"} . "\/\. && rm "); print "$txCommand\n"; #die; my $rrc=system($txCommand); print "rrc=$rrc\n"; unless ($rrc == 0) { logErrorMessage("batch scp to uucphub failed\n"); die; } } $pendingTransResults = undef; $setQuery2 = "COMMIT"; my $setResult2 = $masterConn->exec($setQuery2); if($setResult2->resultStatus!=PGRES_COMMAND_OK) { logErrorMessage($masterConn->errorMessage . "\n" . $setQuery2); die; } undef $masterConn; }#Main =item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple) Mirrors a single SQL Command(change to a single row) to the slave. =over 4 =item * SeqId The id number of the change to mirror. This is the primary key of the pending table. =item * tableName The name of the table the transaction takes place on. =item * op The type of operation this transaction is. 'i' for insert, 'u' for update or 'd' for delete. =item * transId The Transaction of of the Transaction that this command is part of. =item * pendingResults A Results set structure returned from Pg::execute that contains the join of the Pending and PendingData tables for all of the pending row edits in this transaction. =item * currentTuple The tuple(or row) number of the pendingRow for the command that is about to be edited. If the command is an update then this points to the row with IsKey equal to true. The next row, curTuple+1 is the contains the PendingData with IsKey false for the update. =item returns The tuple number of last tuple for this command. This might be equal to currentTuple or it might be larger (+1 in the case of an Update). =back =cut sub mirrorCommand($$$$$$$) { my $seqId = $_[0]; my $tableName = $_[1]; my $op = $_[2]; my $transId = $_[3]; my $pendingResults = $_[4]; my $currentTuple = $_[5]; my $xfout = $_[6]; if($op eq 'i') { $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults ,$currentTuple,$xfout); } if($op eq 'd') { $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults, $currentTuple,$xfout); } if($op eq 'u') { $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, $currentTuple,$xfout); } $commandCount = $commandCount +1; if($commandCount % 100 == 0) { # print "Sent 100 commmands on SeqId $seqId \n"; # flush STDOUT; } return $currentTuple } =item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple) Mirrors an INSERT operation to the slave database. A new row is placed in the slave database containing the primary key from pendingKeys along with the data fields contained in the row identified by sourceOid. =over 4 =item * transId The sequence id of the INSERT operation being mirrored. This is the primary key of the pending table. =item * tableName The name of the table the transaction takes place on. =item * sourceOid The OID of the row in the master database for which this transaction effects. If the transaction is a delete then the operation is not valid. =item * transId The Transaction Id of transaction that this insert is part of. =item * pendingResults A Results set structure returned from Pg::execute that contains the join of the Pending and PendingData tables for all of the pending row edits in this transaction. =item * currentTuple The tuple(or row) number of the pendingRow for the command that is about to be edited. In the case of an insert this should point to the one row for the row edit. =item returns The tuple number of the last tuple for the row edit. This should be currentTuple. =back =cut sub mirrorInsert($$$$$$) { my $seqId = $_[0]; my $tableName = $_[1]; my $transId = $_[2]; my $pendingResults = $_[3]; my $currentTuple = $_[4]; my $xfout = $_[5]; my $counter; my $column; my $firstIteration=1; my %recordValues = extractData($pendingResults,$currentTuple); #Now build the insert query. my $insertQuery = "INSERT INTO $tableName ("; my $valuesQuery = ") VALUES ("; foreach $column (keys (%recordValues)) { if($firstIteration==0) { $insertQuery .= " ,"; $valuesQuery .= " ,"; } $insertQuery .= "\"$column\""; if(defined $recordValues{$column}) { my $quotedValue = $recordValues{$column}; $quotedValue =~ s/\\/\\\\/g; $quotedValue =~ s/'/''/g; $quotedValue =~ s/]]>/]]]]>/g; $valuesQuery .= "E'$quotedValue'"; } else { $valuesQuery .= "null"; } $firstIteration=0; } $valuesQuery .= ")"; print $xfout $insertQuery . $valuesQuery . ";"; return $currentTuple; } =item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple) Deletes a single row from the slave database. The row is identified by the primary key for the transaction in the pendingKeys table. =over 4 =item * SeqId The Sequence id for this delete request. =item * tableName The name of the table to delete the row from. =item * transId The Transaction Id of the transaction that this command is part of. =item * pendingResults A Results set structure returned from Pg::execute that contains the join of the Pending and PendingData tables for all of the pending row edits in this transaction. =item * currentTuple The tuple(or row) number of the pendingRow for the command that is about to be edited. In the case of a delete this should point to the one row for the row edit. =item returns The tuple number of the last tuple for the row edit. This should be currentTuple. =back =cut sub mirrorDelete($$$$$$) { my $seqId = $_[0]; my $tableName = $_[1]; my $transId = $_[2]; my $pendingResult = $_[3]; my $currentTuple = $_[4]; my $xfout = $_[5]; my %dataHash; my $currentField; my $firstField=1; %dataHash = extractData($pendingResult,$currentTuple); my $counter=0; my $deleteQuery = "DELETE FROM $tableName WHERE "; foreach $currentField (keys %dataHash) { if($firstField==0) { $deleteQuery .= " AND "; } my $currentValue = $dataHash{$currentField}; $deleteQuery .= "\""; $deleteQuery .= $currentField; if(defined $currentValue) { $deleteQuery .= "\"=E'"; $deleteQuery .= $currentValue; $deleteQuery .= "'"; } else { $deleteQuery .= " is null "; } $counter++; $firstField=0; } print $xfout $deleteQuery . ";"; return $currentTuple; } =item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple) Mirrors over an edit request to a single row of the database. The primary key from before the edit is used to determine which row in the slave should be changed. After the edit takes place on the slave its primary key will match the primary key the master had immediatly following the edit. All other fields will be set to the current values. Data integrity is maintained because the mirroring is performed in an SQL transcation so either all pending changes are made or none are. =over 4 =item * seqId The Sequence id of the update. =item * tableName The name of the table to perform the update on. =item * transId The transaction Id for the transaction that this command is part of. =item * pendingResults A Results set structure returned from Pg::execute that contains the join of the Pending and PendingData tables for all of the pending row edits in this transaction. =item * currentTuple The tuple(or row) number of the pendingRow for the command that is about to be edited. In the case of a delete this should point to the one row for the row edit. =item returns The tuple number of the last tuple for the row edit. This should be currentTuple +1. Which points to the non key row of the update. =back =cut sub mirrorUpdate($$$$$$) { my $seqId = $_[0]; my $tableName = $_[1]; my $transId = $_[2]; my $pendingResult = $_[3]; my $currentTuple = $_[4]; my $xfout = $_[5]; my $counter; my $quotedValue; my $updateQuery = "UPDATE $tableName SET "; my $currentField; my %keyValueHash; my %dataValueHash; my $firstIteration=1; #Extract the Key values. This row contains the values of the # key fields before the update occours(the WHERE clause) %keyValueHash = extractData($pendingResult,$currentTuple); #Extract the data values. This is a SET clause that contains #values for the entire row AFTER the update. %dataValueHash = extractData($pendingResult,$currentTuple+1); $firstIteration=1; foreach $currentField (keys (%dataValueHash)) { if($firstIteration==0) { $updateQuery .= ", "; } $updateQuery .= " \"$currentField\"="; my $currentValue = $dataValueHash{$currentField}; if(defined $currentValue ) { $quotedValue = $currentValue; $quotedValue =~ s/\\/\\\\/g; $quotedValue =~ s/'/''/g; $quotedValue =~ s/]]>/]]]]>/g; $updateQuery .= "E'$quotedValue'"; } else { $updateQuery .= "null "; } $firstIteration=0; } $updateQuery .= " WHERE "; $firstIteration=1; foreach $currentField (keys (%keyValueHash)) { my $currentValue; if($firstIteration==0) { $updateQuery .= " AND "; } $updateQuery .= "\"$currentField\"="; $currentValue = $keyValueHash{$currentField}; if(defined $currentValue) { $quotedValue = $currentValue; $quotedValue =~ s/\\/\\\\/g; $quotedValue =~ s/'/''/g; $updateQuery .= "E'$quotedValue'"; } else { $updateQuery .= " null "; } $firstIteration=0; } print $xfout $updateQuery . ";"; return $currentTuple+1; } =over 4 =item * seqId The sequence Id of the command being sent. Undef if no command is associated with the query being sent. =item * sqlQuery SQL operation to perform on the slave. =back =cut =item logErrorMessage(error) Mails an error message to the users specified $errorEmailAddr The error message is also printed to STDERR. =over 4 =item * error The error message to log. =back =cut sub logErrorMessage($) { my $error = $_[0]; if(defined $lastErrorMsg and $error eq $lastErrorMsg) { if($repeatErrorCount<$::errorThreshold) { $repeatErrorCount++; warn($error); return; } } $repeatErrorCount=0; if(defined $::errorEmailAddr) { my $mailPipe; open (mailPipe, "|mail -s DBMirror.pl $::errorEmailAddr"); print mailPipe "=====================================================\n"; print mailPipe " DBMirror.pl \n"; print mailPipe " " . $ARGV[0]. " \n"; print mailPipe "\n"; print mailPipe " The DBMirror.pl script has encountred an error. \n"; print mailPipe " It might indicate that either the master database has\n"; print mailPipe " gone down or that the connection to a slave database can\n"; print mailPipe " not be made. \n"; print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n"; print mailPipe "\n"; print mailPipe $error; print mailPipe "\n\n\n=================================================\n"; close mailPipe; } warn($error); $lastErrorMsg = $error; } sub setupSlave($) { my $slavePtr = $_[0]; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database my $resultSet = $masterConn->exec('SELECT mh.MirrorHostId,replace(vsl.name,\' \',\'_\'),mh.comtype FROM ' . ' dbmirror_MirrorHost mh,vessels vsl WHERE mh.MirrorHostId=vsl.id and mh.HostName' . '=\'' . $slavePtr->{"slaveHost"} . '\''); if($resultSet->ntuples !=1) { my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; die; } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); $slavePtr->{"VesselName"} = $resultSet->getvalue(0,1); my $comtype = $resultSet->getvalue(0,2); if (($comtype ne "marinet" && $comtype ne "uucp")) { my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; $errorMessage .= "Has not set comtype\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; die; } $slavePtr->{"comtype"} = $comtype; $slavePtr->{"uucpnode"} = lc($slavePtr->{"VesselName"}); } =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that this transaction has been sent to the current slave. =over 4 =item * lastTransId The Transaction id for the last transaction that has been succesfully mirrored to the currently open slaves. =item * lastSeqId The Sequence Id of the last command that has been succefully mirrored =back =cut sub updateMirrorHostTable($$$) { my $lastTransId = shift; my $lastSeqId = shift; my $isexpuncond = shift; if ($isexpuncond eq 't') { my $deleteTransactionQuery; my $deleteResult; my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction "; $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)"; $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; my $updateResult = $masterConn->exec($updateMasterQuery); unless($updateResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage = $masterConn->errorMessage . "\n"; $errorMessage .= $updateMasterQuery; logErrorMessage($errorMessage); die; } # print "Updated slaves to transaction $lastTransId\n" ; # flush STDOUT; #If this transaction has now been mirrored to all mirror hosts #then it can be deleted. $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID=' . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction' . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM' . ' dbmirror_MirrorHost)'; $deleteResult = $masterConn->exec($deleteTransactionQuery); if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { logErrorMessage($masterConn->errorMessage . "\n" . $deleteTransactionQuery); die; } } else { my $deleteTransactionQuery; my $deleteResult; $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='. $lastTransId.' AND slaveid='.$::slaveInfo->{"MirrorHostId"}; $deleteResult = $masterConn->exec($deleteTransactionQuery); if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { logErrorMessage($masterConn->errorMessage . "\n" . $deleteTransactionQuery); die; } } } sub extractData($$) { my $pendingResult = $_[0]; my $currentTuple = $_[1]; my $fnumber; my %valuesHash; $fnumber = 4; my $dataField = $pendingResult->getvalue($currentTuple,$fnumber); #print "in extract data!!!!!!!!!!!!!!\n"; $dataField =~ s/\\\\/\x11/g; $dataField =~ s/\\\'/\x12/g; while(length($dataField)>0) { # Extract the field name that is surronded by double quotes $dataField =~ m/(\".*?\")/s; my $fieldName = $1; $dataField = substr $dataField ,length($fieldName); $fieldName =~ s/\"//g; #Remove the surronding " signs. #print "fieldName=".$fieldName."\n"; #print "dataField (after fieldname)=".$dataField."\n"; if($dataField =~ m/(^= )/s) { #Matched null $dataField = substr $dataField , length($1); $valuesHash{$fieldName}=undef; # print "found null \n"; } elsif ($dataField =~ m/(^=\')/s) { #Has data. my $value; $dataField = substr $dataField ,2; #Skip the =' LOOP: { #This is to allow us to use last from a do loop. #Recommended in perlsyn manpage. do { my $matchString; my $matchString2; #Find the substring ending with the first ' or first \ $dataField =~ m/(^.*?\')/s; $matchString = $1; # edw pame na kanoune match eite olokliro to value, an yparxei, dil kati pou na teleiwnei # se kati diaforetiko apo \ kai meta ', dil na min einai escaped to ' (dil na min einai timi) # eite na matcharei apla to keno string '' #print "matchString:".$matchString." pos after ".pos($dataField)." length=".length($matchString)." \n"; $matchString2 = substr $matchString,0, length($matchString)-1; $matchString2 =~ s/\x11/\\/g; $matchString2 =~ s/\x12/\'/g; $value .= $matchString2; $dataField = substr $dataField,length($matchString)+1; # print "value:".$value."\n"; last; } until(length($dataField)==0); } $valuesHash{$fieldName} = $value; }#else if else { logErrorMessage "Error in PendingData Sequence Id " . $pendingResult->getvalue($currentTuple,0). "\n"; die; } } #while return %valuesHash; } --------------gblD1txa0httDB5gUF5yeuHa--