public inbox for [email protected]  
help / color / mirror / Atom feed
From: Achilleas Mantzios <[email protected]>
To: Tom Lane <[email protected]>
Cc: [email protected] <[email protected]>
Cc: [email protected]
Subject: Re: Postgresql 18beta1 and SPI changes
Date: Mon, 12 May 2025 13:50:12 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>

Dear All, Dear Tom

On 5/11/25 16:20, Tom Lane wrote:

> Achilleas Mantzios<[email protected]> writes:
>> We use are own version of DBmirror, we run our replication in a highly
>> fine grained manner. So every upgrade I have to make the code compile
>> and test. Up to PostgreSQL 17, I only got minor compilation problems
>> that I managed to resolve fairly easily. However this didn't prove to be
>> the case with PostgreSQL 18beta1, it proved harder to compile and as my
>> fears were verified, it has serious problems.
>> My question : is 18's SPI stabilized ? Can I start work on our version
>> of DBmirror ? Or wait for 18beta2 or -RC ?
> If you think there are changes we need to make, you'd better get
> specific sooner not later.  I'm not aware of any large fixes that
> are pending, cf
>
> https://wiki.postgresql.org/wiki/PostgreSQL_18_Open_Items

I attach

a) our old source (pending.c.orig), as of PostgreSQL 17 (tested for some 
7 months, so pretty well tested),

b) the compilation errors when compiled against 18beta1, and

c) the patch that I came up with, which seems (in my minimal testing) to 
yield correct results on 18beta1.

The majority of serious warnings have to do with de-toasting arrays and 
the PK's int2vector , while the error has to do with getting column 
details such as attisdropped and attname.

Please have a look, and share your thoughts. I haven't touched serious C 
coding till I first wrote the above sometime in 2004 with a bunch of 
additions some years ago.


>
> 			regards, tom lane
/****************************************************************************
 * pending.c
 * $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
 *
 * This file contains a trigger for Postgresql-7.x to record changes to tables
 * to a pending table for mirroring.
 * All tables that should be mirrored should have this trigger hooked up to it.
 *
 *	 Written by Steven Singer ([email protected])
 *	 (c) 2001-2002 Navtech Systems Support Inc.
 *       ALL RIGHTS RESERVED
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 *
 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 *
 ***************************************************************************/
#include <string.h>

#include "postgres.h"

#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define	Int2VectorSize(n)	(offsetof(int2vector, values) + (n) * sizeof(int16))

#define TRUE 1
#define FALSE 0

PG_MODULE_MAGIC;

enum FieldUsage
{
	PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};


int storePending(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupdesc,
			 Oid tableOid,
			 char cOp,int slaveid, char origOp);

int storexid(void);

int handler(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupdesc,
			 Oid tableOid,
			 char cOp, int slaveid, char *pkxpress);

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);


int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);*/

int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);

int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData);

int2vector *getPrimaryKey(Oid tblOid);

char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
			enum FieldUsage eKeyUsage);

bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);

char *get_namespace_name(Oid nspid);


#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);

PG_FUNCTION_INFO_V1(recordchange);


/*****************************************************************************
 * The entry point for the trigger function.
 * The Trigger takes a single SQL 'text' argument indicating the name of the
 * table the trigger was applied to.  If this name is incorrect so will the
 * mirroring.
 ****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
	TriggerData *trigdata;
	TupleDesc	tupdesc;
	HeapTuple	beforeTuple = NULL;
	HeapTuple	afterTuple = NULL;
	HeapTuple	retTuple = NULL;
	char	   *tblname;
	char		op = 0;
	char 	   *schemaname;
	char	   *fullyqualtblname;
	char *pkxpress;
	if (fcinfo->context != NULL)
	{

		if (SPI_connect() < 0)
		{
			elog(NOTICE, "recordchange could not connect to SPI");
			return -1;
		}
		trigdata = (TriggerData *) fcinfo->context;
		/* Extract the table name */
		tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
		schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
		fullyqualtblname = palloc(strlen(tblname) + 
					      strlen(schemaname) + 6);
 		sprintf(fullyqualtblname,"\"%s\".\"%s\"",
			schemaname,tblname);
#else
		fullyqualtblname = palloc(strlen(tblname) + 3);
		sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
		tupdesc = trigdata->tg_relation->rd_att;
		if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
		{
			retTuple = trigdata->tg_newtuple;
			beforeTuple = trigdata->tg_trigtuple;
			afterTuple = trigdata->tg_newtuple;
			op = 'u';

		}
		else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
		{
			retTuple = trigdata->tg_trigtuple;
			afterTuple = trigdata->tg_trigtuple;
			op = 'i';
		}
		else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
		{
			retTuple = trigdata->tg_trigtuple;
			beforeTuple = trigdata->tg_trigtuple;
			op = 'd';
		}

		if (storexid()) {
			elog(ERROR, "Operation could not be mirrored. storexid problem");
			return PointerGetDatum(NULL);
		}

		pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
		if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
		{
			/* An error occoured. Skip the operation. */
			elog(ERROR, "Operation could not be mirrored");
			return PointerGetDatum(NULL);

		}
#if defined DEBUG_OUTPUT
		elog(NOTICE, "Returning on success");
#endif
		pfree(fullyqualtblname);
		pfree(pkxpress);
		SPI_finish();
		return PointerGetDatum(retTuple);
	}
	else
	{
		/*
		 * Not being called as a trigger.
		 */
		return PointerGetDatum(NULL);
	}
}

/*****************************************************************************
 * stores the current xid in dbmirror_xactions
 *****************************************************************************/

int
storexid(void) {
	//char	   *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES ($1)";
	char	   *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT $1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";

	int		iResult = 0;

	Datum		saPlanData[1];
	Oid		taPlanArgTypes[1] = {INT4OID};
	void	   *vpPlan;

	vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
	if (vpPlan == NULL)
		elog(NOTICE, " storexid Error creating plan");

	saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());

	iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
	if (iResult < 0) {
		elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, iResult);
		return -1;
	}


#if defined DEBUG_OUTPUT
	elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif

	return 0;

}

/*****************************************************************************
 * Constructs and executes an SQL query to write a record of this tuple change
 * to the pending table.
 *****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupDesc,
			 Oid tableOid,
			 char cOp, int slaveid, char origOp)
{
	char	   *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";

	int			iResult = 0;
	HeapTuple	tCurTuple;
	char nulls[5]="    ";

	//Points the current tuple(before or after)
		Datum		saPlanData[5];
	Oid			taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, INT4OID, CHAROID};
	void	   *vpPlan;

	tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;



	vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
	if (vpPlan == NULL)
		elog(NOTICE, "Error creating plan");
	/* SPI_saveplan(vpPlan); */

	saPlanData[0] = PointerGetDatum(cpTableName);
	saPlanData[1] = CharGetDatum(cOp);
	saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
	saPlanData[3] = Int32GetDatum(slaveid);
	if (slaveid <=0) nulls[3]='n';
	saPlanData[4] = CharGetDatum(origOp);

	iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
	if (iResult < 0)
		elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);


#if defined DEBUG_OUTPUT
	elog(NOTICE, "row successfully stored in pending table");
#endif

	if (cOp == 'd')
	{
		/**
		 * This is a record of a delete operation.
		 * Just store the key data.
		 */
		iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
	}
	else if (cOp == 'i')
	{
		/**
		 * An Insert operation.
		 * Store all data
		 */
		iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE);

	}
	else
	{
		/* op must be an update. */
		iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
		iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE);
	}

#if defined DEBUG_OUTPUT
	elog(NOTICE, "Done storing keyinfo/data");
#endif

	return iResult;

}

int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
			 TupleDesc tTupleDesc, Oid tableOid)
{

	Oid			saPlanArgTypes[1] = {VARCHAROID};
	char	   *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
	void	   *pplan;
	Datum		saPlanData[1];
	char	   *cpKeyData;
	char	   *cpKeyData_tmp;
	int			iRetCode;

	pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
	if (pplan == NULL)
	{
		elog(NOTICE, "Could not prepare INSERT plan");
		return -1;
	}

	/* pplan = SPI_saveplan(pplan); */
	cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, PRIMARY);
	if (cpKeyData == NULL)
	{
		elog(ERROR,"Could not determine primary key data");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
	cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
	memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
	SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
	saPlanData[0] = PointerGetDatum(cpKeyData_tmp);

	iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);

	if (cpKeyData != NULL)
		pfree(cpKeyData);
	if (cpKeyData_tmp != 0)
		pfree(cpKeyData_tmp);

	if (iRetCode != SPI_OK_INSERT)
	{
		elog(NOTICE, "Error inserting row in storeKeyInfo");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Insert successful");
#endif

	return 0;

}




int2vector *
getPrimaryKey(Oid tblOid)
{
	char	   *queryBase;
	char	   *query;
	bool		isNull;
	int2vector *resultKey;
	int2vector *tpResultKey;
	HeapTuple	resTuple;
	Datum		resDatum;
	int			ret;

	queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
	query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
	sprintf(query, "%s%d", queryBase, tblOid);
	ret = SPI_exec(query, 1);
	if (ret != SPI_OK_SELECT || SPI_processed != 1)
	{
		elog(NOTICE, "Could not select primary index key");
		return NULL;
	}

	resTuple = SPI_tuptable->vals[0];
	resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);

	if (isNull) {
		elog(NOTICE, "PKey is NULL");
		return NULL;
	}

	tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
	int n=tpResultKey->dim1;
	resultKey = palloc(Int2VectorSize(n));
	if (n > 0)
		memcpy(resultKey->values, tpResultKey->values, n * sizeof(int16));

	SET_VARSIZE(resultKey, Int2VectorSize(n));
	resultKey->ndim = 1;
	resultKey->dataoffset = 0;
	resultKey->elemtype = INT2OID;
	resultKey->dim1 = n;
	resultKey->lbound1 = 0;

	pfree(query);
	return resultKey;
}

/******************************************************************************
 * Stores a copy of the non-key data for the row.
 *****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{

	Oid			planArgTypes[1] = {VARCHAROID};
	char	   *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
	SPIPlanPtr pplan;
	Datum		planData[1];
	char	   *cpKeyData;
	char	   *cpKeyData_tmp;
	int			iRetValue;

	pplan = SPI_prepare(insQuery, 1, planArgTypes);
	if (pplan == NULL)
	{
		elog(NOTICE, "Could not prepare INSERT plan");
		return -1;
	}

	/* pplan = SPI_saveplan(pplan); */
	if (iIncludeKeyData == 0)
		cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, tableOid, NONPRIMARY);
	else
		cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, ALL);

	cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
	memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
        SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
	planData[0] = PointerGetDatum(cpKeyData_tmp);

	iRetValue = SPI_execp(pplan, planData, NULL, 1);

	if (cpKeyData != 0)
		pfree(cpKeyData);
	if (cpKeyData_tmp != 0)
		pfree(cpKeyData_tmp);

	if (iRetValue != SPI_OK_INSERT)
	{
		elog(NOTICE, "Error inserting row in storeData");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Insert successful");
#endif

	return 0;

}

/**
 * Packages the data in tTupleData into a string of the format
 * FieldName='value text'  where any quotes inside of value text
 * are escaped with a backslash and any backslashes in value text
 * are esacped by a second back slash.
 *
 * tTupleDesc should be a description of the tuple stored in
 * tTupleData.
 *
 * eFieldUsage specifies which fields to use.
 *	PRIMARY implies include only primary key fields.
 *	NONPRIMARY implies include only non-primary key fields.
 *	ALL implies include all fields.
 */
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
			enum FieldUsage eKeyUsage)
{
	int			iNumCols;
	int2vector *tpPKeys = NULL;
	int			iColumnCounter;
	char	   *cpDataBlock;
	int			iDataBlockSize;
	int			iUsedDataBlock;

	iNumCols = tTupleDesc->natts;

	if (eKeyUsage != ALL)
	{
		tpPKeys = getPrimaryKey(tableOid);
		if (tpPKeys == NULL)
			return NULL;
	}
#if defined DEBUG_OUTPUT
	if (tpPKeys != NULL)
		elog(NOTICE, "Have primary keys");
#endif
	cpDataBlock = palloc(BUFFER_SIZE);
	iDataBlockSize = BUFFER_SIZE;
	iUsedDataBlock = 0;			/* To account for the null */

	for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
	{
		int			iIsPrimaryKey;
		int			iPrimaryKeyIndex;
		char	   *cpUnFormatedPtr;
		char	   *cpFormatedPtr;

		char	   *cpFieldName;
		char	   *cpFieldData;

		if (eKeyUsage != ALL)
		{
			/* Determine if this is a primary key or not. */
			iIsPrimaryKey = 0;
			int16 *tpPKeysV = tpPKeys->values;
			int tpPKeysSize = tpPKeys->dim1;
			for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
				 iPrimaryKeyIndex++)
			{
				if (tpPKeysV[iPrimaryKeyIndex] == iColumnCounter)
				{
					iIsPrimaryKey = 1;
					break;
				}
			}
			if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
			{
				/**
				 * Don't use.
				 */
#if defined DEBUG_OUTPUT
				elog(NOTICE, "Skipping column");
#endif
				continue;
			}
		}						/* KeyUsage!=ALL */
#ifndef  NODROPCOLUMN
               // this comment is for 11
               if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
               //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
                 {
                   /**
                    * This column has been dropped.
                    * Do not mirror it.
                    */
                   continue;
                 }
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
		if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
			continue;
		}

		// this comment is for 11
		cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
		//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
		elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
		while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
		{
			cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
			iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
		}
		sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
		iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
		cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);

		cpUnFormatedPtr = cpFieldData;
		cpFormatedPtr = cpDataBlock + iUsedDataBlock;
		if (cpFieldData != NULL)
		{
			*cpFormatedPtr = '\'';
			iUsedDataBlock++;
			cpFormatedPtr++;
		}
		else
		{
			sprintf(cpFormatedPtr," ");
			iUsedDataBlock++;
			cpFormatedPtr++;
			continue;

		}
#if defined DEBUG_OUTPUT
		elog(NOTICE, "FieldData: %s", cpFieldData);
		elog(NOTICE, "Starting format loop");
#endif
		while (*cpUnFormatedPtr != 0)
		{
			while (iDataBlockSize - iUsedDataBlock < 2)
			{
				cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
				iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
				cpFormatedPtr = cpDataBlock + iUsedDataBlock;
			}
			if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'')
			{
				*cpFormatedPtr = '\\';
				cpFormatedPtr++;
				iUsedDataBlock++;
			}
			*cpFormatedPtr = *cpUnFormatedPtr;
			cpFormatedPtr++;
			cpUnFormatedPtr++;
			iUsedDataBlock++;
		}

		pfree(cpFieldData);

		while (iDataBlockSize - iUsedDataBlock < 3)
		{
			cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
			iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
			cpFormatedPtr = cpDataBlock + iUsedDataBlock;
		}
		sprintf(cpFormatedPtr, "' ");
		iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
		elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif

	}							/* for iColumnCounter  */
	if (tpPKeys != NULL)
		pfree(tpPKeys);
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Returning: DataBlockSize:%d iUsedDataBlock:%d",iDataBlockSize,
			iUsedDataBlock);
#endif
	memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);

	return cpDataBlock;

}





bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
	char *qb1;
	char *qb2;
	char *q;
	char *fieldName;
	int ret;
	HeapTuple resTuple;
	SPITupleTable *SIDKEY_tupTable;
	int     SIDKEY_processed;
	char *value;
#if defined DEBUG_OUTPUT
	elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
	fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
	elog(NOTICE, "fieldName=%s",fieldName);
#endif
	qb1 = "SELECT ";
	qb2 = " = any(attnames) from dbmirror_exclude_attributes where tblname=";
#if defined DEBUG_OUTPUT
	elog(NOTICE, "%s q size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
	q = palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
	sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	pfree(fieldName);

	if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (value == NULL) return FALSE;

	if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
		pfree(value);
		return TRUE;
	} 
	else {
		pfree(value);
		return FALSE;
	}
}




#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) {

	HeapTuple       tuple;
	tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
	elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif

	if (slaveid == 0 && op == 'm') {
		elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
		return -1;
	}
	/*
	ISEXPUNCOND
	*/
	else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
	/*
	ISEXPWITHSLAVEID
	*/
	else if (slaveid > 0 && op =='d') {
		int retval;
		retval =  storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		return updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
	}
	else if (slaveid > 0 && op == 'i') {
		int retval;
		retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
		retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		return retval;
	}
	else if (slaveid > 0 && op == 'u') {
		int retval;
		int old_slaveid;
	 	old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
		retval = handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
		if (old_slaveid != -3 && old_slaveid != slaveid) {
			retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
		}
		if (old_slaveid != slaveid) {
			retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
		}
		else {
			retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		}
		if (old_slaveid != -3) {
			retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
		}
		return retval;
	}
	else if (slaveid == -3 && (op =='d' || op =='i')) {
		return 0;
	}
	else if (slaveid == -3 && op =='u') {
		int retval=0;
		int old_slaveid;
	 	old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
		if (old_slaveid > 0) {
			retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
			retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
		}
		return retval;
	}
	/*
	ISIMPLBACKWARD KALESMENO apo handleParents
	*/
	else if (slaveid >0 && op == 'm') {
		if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
		return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei paidi ISIMPLBACKWARD
*/
/*
			elog(ERROR,"Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen");
*/
		}
		if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
		return 0;
/*
*/
		}
		if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
		return 0;
		}
		if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
		else {
			int retval;
			retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
			retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
			return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
		}
	}
	/*
	ISIMPL* but called from trigger ('i','d','u')
	*/

	/*
	ISIMPLBACKWARD
	*/
	else if (slaveid == -1) {
		if (op == 'i') return 0;
		/*
		Delete in this fashion may happen for "orphan" rows.
		The deletions will be "triggered" by childern deletions/updates.
		*/
		else if (op == 'd') {
			int retval=0;
			int *slaves;
			int *run;
			slaves = getSlaves(cpTableName,pkxpress);
			if (slaves == NULL) return 0;
			for (run=slaves;*run;run++) {
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
			}

			pfree(slaves);
			return retval || deleteAccnt(cpTableName,pkxpress); 

		}
		else if (op == 'u') {
			int retval=0;
			int *slaves;
			int *run;

			slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"in slaveid=-1, op='u', slaves=%d",(int)slaves);
#endif
			if (slaves == NULL) return 0;
			for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
				elog(NOTICE,"in slaveid=-1, op='u', runslave=%d",*run);
#endif
				retval = retval || handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
			}
			pfree(slaves);
			return retval;
		}
	}
	/*
	ISIMPLFORWARD
	*/
	else if (slaveid == -2) {
		if (op == 'i') {
			int retval=0;
			int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
			if (forw_slaveid>0) {
				retval = retval || handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
			}
			return retval;
		}
		else if (op == 'd') {
			int retval=0;
			int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);

			if (forw_slaveid>0) {
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
			}
			return retval ; 

		}
		else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
			int retval=0;

			int forw_slaveid = getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
			int old_slaveid = getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
			//char origop = getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
			//if (origop == 'i') {
			//	old_slaveid = forw_slaveid;
			//}
			//elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
			// LEGACY CODE as of 2016-03-18
			//retval = retval || handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
			//retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
			//retval = retval || updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
			if (forw_slaveid>0) {
				retval = handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
				if (old_slaveid != -3 && old_slaveid != forw_slaveid) {
					retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
				}
				if (old_slaveid != forw_slaveid) {
					retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
				}
				else {
					retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
				}
				if (old_slaveid != -3) {
					retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
				}
			}
			else { // ISIMPLFORWARD me vslid IS NULL --> -3
				if (old_slaveid > 0) {
					retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
					retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
				}
			}
			return retval;
		}
	}
	return 0;
}

int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
	char *slave_siteidkeyname;
	char *qb;
	char *q;
	char *foo;
	int ret;
	HeapTuple resTuple;
	int attnum;
	int slaveid;
	SPITupleTable *SIDKEY_tupTable;
	int     SIDKEY_processed;
#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
		/*
		ISIMPLBACKWARD
		*/
	if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
		/*
		ISEXPUNCOND
		*/
	if (slave_siteidkeyname == NULL) return 0;
		/*
		ISIMPLFORWARD
		*/
	if (strncmp(slave_siteidkeyname,"___",3) == 0) {
		pfree(slave_siteidkeyname);
		return -2;
	}
	attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		elog(ERROR,"WRONG slaveidkeyname=%s given for table %s",slave_siteidkeyname,cpTableName);
		pfree(slave_siteidkeyname);
		return 0;
	}
	foo=SPI_getvalue(tuple,tupleDesc,attnum);
		/*
		ISEXPWITHSLAVEID BUT siteidkeyname is null
		*/
	if (foo == NULL) {
		pfree(slave_siteidkeyname);
		return -3;
	}
	pfree(slave_siteidkeyname);
	slaveid = atoi(foo);
	pfree(foo);
		/*
		ISEXPWITHSLAVEID
		*/
	return slaveid;
}

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}

	q = palloc(512);
	sprintf(q,"SELECT * FROM public.%s where %s='%s'",forw_table,forw_table_id,pathcolval);
	ret = SPI_exec(q,1);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		pfree(slave_siteidkeyname);
		elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should have row with %s=%s",forw_table,forw_table_id,pathcolval);
		return -2;
	}
	pfree(q);
	RealPar_tupTable = SPI_tuptable;
	RealPar_tuple = RealPar_tupTable->vals[0];
	ParTableName = palloc(256);
	sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
	ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
	if (ret == -2)  {
		pfree(slave_siteidkeyname);
		ret = getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
		pfree(ParTableName);
		return ret;
	}
	if (ret <= 0) {
		pfree(slave_siteidkeyname);
		pfree(ParTableName);
//		elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should have slaveid>0",forw_table);
/***
		apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID kai slaveid IS NULL
***/
		return -3;
	}
	pfree(ParTableName);
	pfree(slave_siteidkeyname);
	return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai meta
me query na briskoume to tuple tou parent table.
*/
}

int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}
/*
consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			slaveid_d = -3;
		}
		else {
			slaveid_d = atoi(foo);
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return slaveid_d;
	}
/* dirty patch code to handle copy supplycase. it was supposed to solve the problem of not turning 'u' to 'i' for e.g supplycasesquotes */ 
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues when changing vslid (null->int, int->null, intX->intY) */ 
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type ISIMPLFORWARD (-2) tables */
/*

	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",cpTableName,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			slaveid_d = -3;
		}
		else {
			slaveid_d = atoi(foo);
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return slaveid_d;
	}
*/

/**
pls consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing

if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai (eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, then update on the kid, and then perform a second UPDATE on the kid, this will result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i') 
eite parent op != 'i' : (synepagetai : 
	eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou statement pio pano
	eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
 then we let the code fallback to return getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		pfree(slave_siteidkeyname);
		pfree(pathcolval);
		return -3;
	}

	pfree(slave_siteidkeyname);
	pfree(pathcolval);
	return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */

/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio xid kai op='d'
*/
}

/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/
/*

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}

	q = palloc(512);
	sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			pfree(q);
			pfree(pathcolval);
			elog(ERROR,"WRONG forward parent table %s has null origop in this transaction",forw_table);
		}
		else {
			origop = foo[0];
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return origop;
	}
	return ' ';
}
*/

#define MAX_WHERE_CLAUSE	512
#define MAX_QUERY_LEN	512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
	char *qb;
	char *q;
	bool isNull;
	int ret;
	int i;
	HeapTuple resTuple;
	Datum resDatum;
	Oid confrelid;
	char *confrelname;
	int16	*thisCols;
	int16	*fkCols;
	int16	*run;
	ArrayType *arr;
	char	AthisColsVals[MAXKCOLS][MAXCOL_LEN];
	char	fkColsNames[MAXKCOLS][MAXCOL_LEN];
	char	thisColsTypes[MAXKCOLS][100];
	short	numOfCols;
	SPITupleTable *FK_tupTable;
	int	FK_processed;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif

	qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
	q = palloc(strlen(qb)+MAX_OID_LEN+1);
	sprintf(q,"%s%d",qb,tableOid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
		elog(NOTICE, "no FKs");
		return 0;
	}
	/*
	For every FK dependency we track and handle the parent table
	*/
	FK_tupTable = SPI_tuptable;
	FK_processed = SPI_processed;

	for (i=0;i<FK_processed;i++) {
		char *WHERE;
		char *ParTableName;
		int j;
		char FkHasNullValueORXcluded=0;
		char handleit=0;

		SPITupleTable *RealPar_tupTable;
		int     RealPar_processed;
		HeapTuple RealPar_tuple;
		int colrun;
		SPITupleTable *thisatt_tupTable;
		int     thisatt_processed;
		HeapTuple thisatt_tuple;

		resTuple = FK_tupTable->vals[i];


		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
		confrelid = (Oid) DatumGetObjectId(resDatum);

		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		thisCols = (int16 *)ARR_DATA_PTR(arr);
		numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
		int DIM_0 = ARR_DIMS(arr)[0];
		int LBOUND_0 = ARR_LBOUND(arr)[0];
//		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
//		confrelname = (char *) DatumGetName(resDatum);
//		elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
			int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
			char *fooname;

			q = palloc(512);
			sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		                elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
				return -2;
			}
			pfree(q);
			thisatt_processed = SPI_processed;
			thisatt_tupTable = SPI_tuptable;
			thisatt_tuple = thisatt_tupTable->vals[0];
			fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
			char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
			char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

			

#if defined DEBUG_OUTPUT
			elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s",fooname,*run,thiscolrun,value);
#endif
			if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun))  {
				FkHasNullValueORXcluded=1;
				break;
			}
			memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
			memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
			
			if (Btuple != NULL) {
				char *Bvalue = SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s,Before value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
				if (Bvalue == NULL) handleit=1;
				else if (strcmp(Bvalue,value)) {
					handleit=1;
					pfree(Bvalue);
				}

			}
			else handleit=1;
			pfree(value);
		}
#if defined DEBUG_OUTPUT
		elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
		elog(NOTICE,"handle it = %d, ",handleit);
		elog(NOTICE,"FkHasNullValueORXcluded  = %d, ",FkHasNullValueORXcluded);
#endif
		if (FkHasNullValueORXcluded || !handleit) continue;
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		fkCols = (int16 *)ARR_DATA_PTR(arr);
		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
			SPITupleTable *PAR_tupTable;
			int     PAR_processed;
			HeapTuple PAR_resTuple;
			char *value;
			q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
			sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
				elog(ERROR, "Fatal Error:no PK for FK relation");
				return -2;
			}
			pfree(q);
			PAR_tupTable = SPI_tuptable;
			PAR_processed = SPI_processed;
			PAR_resTuple = PAR_tupTable->vals[0];
			value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
			memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
			pfree(value);

		}
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
		confrelname = (char *) DatumGetName(resDatum);
	/*
	There must be at least 1 column in compound FK!!
	*/
		WHERE = palloc(MAX_WHERE_CLAUSE);
		if (!strncmp(thisColsTypes[0],"int",3))
			sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
		else
			sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
		for (j=1;j<numOfCols;j++) {
			if (!strncmp(thisColsTypes[j],"int",3))
				sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
			else
				sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
		}

		q = palloc(512);
		sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
		//elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
		elog(NOTICE,"Found FK, SQL=%s",q);
#endif
		ret = SPI_exec(q,1);
		if (ret != SPI_OK_SELECT || SPI_processed != 1) {
	                elog(ERROR, "Fatal error: handleParents: par table %s should have row with %s",confrelname,WHERE);
			return -2;
		}
		pfree(q);
		RealPar_processed = SPI_processed;
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		ParTableName = palloc(256);
		sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
		ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
		pfree(ParTableName);
		pfree(WHERE);

	}
	return 0;
}

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	/*
	sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
	else return 1;
	*/
	sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
	else return 1;
}

int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;
	bool isNull;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
	return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}

int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	/*
	public.dbmirror_accounting.cnt DEFAULT = 1
	*/
	sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
	else return 0;
}


char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) {
	int2vector *pk;
	int i;
	char *WHERE = palloc(MAX_WHERE_CLAUSE);

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif

	pk = getPrimaryKey(tableOid);
	if (pk == NULL) return NULL;
	int16 *pkV = pk->values;
	int pkSize = pk->dim1;
	if (pkSize>0) {
		char *keyname;
		char *keyval;
		keyname = SPI_fname(tupleDesc,pkV[0]);
		if (keyname == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[0],cpTableName);
			return NULL;
		}
		keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
		if (keyval == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[0],cpTableName);
			return NULL;
		}

		sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
		pfree(keyname);
		pfree(keyval);
	}
	for (i=1;i<pkSize;i++) {
		char *keyname;
		char *keyval;
		keyname = SPI_fname(tupleDesc,pkV[i]);
		if (keyname == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[i],cpTableName);
			return NULL;
		}
		keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
		if (keyval == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[i],cpTableName);
			return NULL;
		}

		sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
		pfree(keyname);
		pfree(keyval);
		
	}
	pfree(pk);
	return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
	char *q;
	int ret;
	int SLAVE_processed;
	SPITupleTable *SLAVE_tupTable;
	int *slaves;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getSlaves of Table=%s, pkxpress=%s",cpTableName,pkxpress);
#endif

	if (pkxpress == NULL) return NULL;
	
	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
	SLAVE_tupTable = SPI_tuptable;
	SLAVE_processed = SPI_processed;
	slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
	elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
	for (ret=0;ret<SLAVE_processed;ret++) {
		char *slaveidStr = SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
		*(slaves+ret) = atoi(slaveidStr);
		pfree(slaveidStr);
		
	}
	*(slaves+SLAVE_processed) = 0;
	return slaves;


}

int deleteAccnt (char *cpTableName, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret == SPI_OK_DELETE) return 0;
	else return -2;
}

int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret == SPI_OK_DELETE) return 0;
	else return -2;
}

int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
	char *qb;
	char *q;
	bool isNull;
	int ret;
	int i;
	HeapTuple resTuple;
	Datum resDatum;
	Oid confrelid;
	char *confrelname;
	int16	*thisCols;
	int16	*fkCols;
	int16	*run;
	ArrayType *arr;
	char	BthisColsVals[MAXKCOLS][MAXCOL_LEN];
	char	fkColsNames[MAXKCOLS][MAXCOL_LEN];
	char	thisColsTypes[MAXKCOLS][100];
	short	numOfCols;
	SPITupleTable *FK_tupTable;
	int	FK_processed;
	int ParSlaveId;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif

	qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
	q = palloc(strlen(qb)+MAX_OID_LEN+1);
	sprintf(q,"%s%d",qb,tableOid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
		elog(NOTICE, "no FKs");
		return 0;
	}
	/*
	For every FK dependency we track and handle the parent table
	*/
	FK_tupTable = SPI_tuptable;
	FK_processed = SPI_processed;

	for (i=0;i<FK_processed;i++) {
		char *WHERE;
		char *ParTableName;
		int j;
		char FkHasNullValueORXcluded=0;
		char decreaseit=0;

		SPITupleTable *RealPar_tupTable;
		int     RealPar_processed;
		HeapTuple RealPar_tuple;
		int colrun;
		SPITupleTable *thisatt_tupTable;
                int     thisatt_processed;
                HeapTuple thisatt_tuple;


		resTuple = FK_tupTable->vals[i];


		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
		confrelid = (Oid) DatumGetObjectId(resDatum);

		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		thisCols = (int16 *)ARR_DATA_PTR(arr);
		numOfCols=ARR_DIMS(arr)[0];
		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
			int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
                        char *fooname;

                        q = palloc(512);
                        sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
                                return -2;
                        }
                        pfree(q);
                        thisatt_processed = SPI_processed;
                        thisatt_tupTable = SPI_tuptable;
                        thisatt_tuple = thisatt_tupTable->vals[0];
                        fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
                        char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

			char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));



#if defined DEBUG_OUTPUT
			elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s",fooname,*run,thiscolrun,value);
#endif
			if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun))  {
				FkHasNullValueORXcluded=1;
				break;
				/*
				Nothing is done regarding "Before" status of parent since there is not one
				*/
			}
			memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
			memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);

			if (Atuple != NULL) {
				char *Avalue = SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s,After value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
				if (Avalue == NULL) decreaseit=1;
				else if (strcmp(Avalue,value)) {
					decreaseit=1;
					pfree(Avalue);
				}
			}
			else decreaseit=1; /* is delete operation*/
			pfree(value);
		}
#if defined DEBUG_OUTPUT
		elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
		elog(NOTICE,"decrease it = %d, ",decreaseit);
		elog(NOTICE,"FkHasNullValueORXcluded  = %d, ",FkHasNullValueORXcluded);
#endif

		if (FkHasNullValueORXcluded || !decreaseit) continue;


		
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		fkCols = (int16 *)ARR_DATA_PTR(arr);
		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
			SPITupleTable *PAR_tupTable;
			int     PAR_processed;
			HeapTuple PAR_resTuple;
			char *value;
			q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
			sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
				elog(ERROR, "Fatal Error:no PK for FK relation");
				return -2;
			}
			pfree(q);
			PAR_tupTable = SPI_tuptable;
			PAR_processed = SPI_processed;
			PAR_resTuple = PAR_tupTable->vals[0];
			value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
			memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
			pfree(value);

		}
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
		confrelname = (char *) DatumGetName(resDatum);
	/*
	there must be at least 1 column in compound FK!!
	*/
		WHERE = palloc(MAX_WHERE_CLAUSE);
		if (!strncmp(thisColsTypes[0],"int",3))
			sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
		else
			sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
		for (j=1;j<numOfCols;j++) {
			if (!strncmp(thisColsTypes[j],"int",3))
				sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
			else
				sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
		}

		q = palloc(512);
		sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
		//elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
		elog(NOTICE,"Found FK, SQL=%s",q);
#endif
		ret = SPI_exec(q,1);
		if (ret != SPI_OK_SELECT || SPI_processed != 1) {
	                elog(ERROR, "Fatal error: updateAcctParents: par table %s should have row with %s",confrelname,WHERE);
			return -2;
		}
		pfree(q);
		RealPar_processed = SPI_processed;
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		ParTableName = palloc(256);
		sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
		ParSlaveId = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
		if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
			pfree(ParTableName);
			continue;
		}
		if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
			ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
			ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
			ret = updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);

		}
		pfree(ParTableName);
		pfree(WHERE);

	}
	return 0;
}


Attachments:

  [text/plain] pending.c.orig (60.8K, 3-pending.c.orig)
  download | inline:
/****************************************************************************
 * pending.c
 * $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
 *
 * This file contains a trigger for Postgresql-7.x to record changes to tables
 * to a pending table for mirroring.
 * All tables that should be mirrored should have this trigger hooked up to it.
 *
 *	 Written by Steven Singer ([email protected])
 *	 (c) 2001-2002 Navtech Systems Support Inc.
 *       ALL RIGHTS RESERVED
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 *
 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 *
 ***************************************************************************/
#include <string.h>

#include "postgres.h"

#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define	Int2VectorSize(n)	(offsetof(int2vector, values) + (n) * sizeof(int16))

#define TRUE 1
#define FALSE 0

PG_MODULE_MAGIC;

enum FieldUsage
{
	PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};


int storePending(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupdesc,
			 Oid tableOid,
			 char cOp,int slaveid, char origOp);

int storexid(void);

int handler(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupdesc,
			 Oid tableOid,
			 char cOp, int slaveid, char *pkxpress);

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);


int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);*/

int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);

int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData);

int2vector *getPrimaryKey(Oid tblOid);

char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
			enum FieldUsage eKeyUsage);

bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);

char *get_namespace_name(Oid nspid);


#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);

PG_FUNCTION_INFO_V1(recordchange);


/*****************************************************************************
 * The entry point for the trigger function.
 * The Trigger takes a single SQL 'text' argument indicating the name of the
 * table the trigger was applied to.  If this name is incorrect so will the
 * mirroring.
 ****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
	TriggerData *trigdata;
	TupleDesc	tupdesc;
	HeapTuple	beforeTuple = NULL;
	HeapTuple	afterTuple = NULL;
	HeapTuple	retTuple = NULL;
	char	   *tblname;
	char		op = 0;
	char 	   *schemaname;
	char	   *fullyqualtblname;
	char *pkxpress;
	if (fcinfo->context != NULL)
	{

		if (SPI_connect() < 0)
		{
			elog(NOTICE, "recordchange could not connect to SPI");
			return -1;
		}
		trigdata = (TriggerData *) fcinfo->context;
		/* Extract the table name */
		tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
		schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
		fullyqualtblname = palloc(strlen(tblname) + 
					      strlen(schemaname) + 6);
 		sprintf(fullyqualtblname,"\"%s\".\"%s\"",
			schemaname,tblname);
#else
		fullyqualtblname = palloc(strlen(tblname) + 3);
		sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
		tupdesc = trigdata->tg_relation->rd_att;
		if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
		{
			retTuple = trigdata->tg_newtuple;
			beforeTuple = trigdata->tg_trigtuple;
			afterTuple = trigdata->tg_newtuple;
			op = 'u';

		}
		else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
		{
			retTuple = trigdata->tg_trigtuple;
			afterTuple = trigdata->tg_trigtuple;
			op = 'i';
		}
		else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
		{
			retTuple = trigdata->tg_trigtuple;
			beforeTuple = trigdata->tg_trigtuple;
			op = 'd';
		}

		if (storexid()) {
			elog(ERROR, "Operation could not be mirrored. storexid problem");
			return PointerGetDatum(NULL);
		}

		pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
		if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
		{
			/* An error occoured. Skip the operation. */
			elog(ERROR, "Operation could not be mirrored");
			return PointerGetDatum(NULL);

		}
#if defined DEBUG_OUTPUT
		elog(NOTICE, "Returning on success");
#endif
		pfree(fullyqualtblname);
		pfree(pkxpress);
		SPI_finish();
		return PointerGetDatum(retTuple);
	}
	else
	{
		/*
		 * Not being called as a trigger.
		 */
		return PointerGetDatum(NULL);
	}
}

/*****************************************************************************
 * stores the current xid in dbmirror_xactions
 *****************************************************************************/

int
storexid(void) {
	//char	   *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES ($1)";
	char	   *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT $1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";

	int		iResult = 0;

	Datum		saPlanData[1];
	Oid		taPlanArgTypes[1] = {INT4OID};
	void	   *vpPlan;

	vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
	if (vpPlan == NULL)
		elog(NOTICE, " storexid Error creating plan");

	saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());

	iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
	if (iResult < 0) {
		elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, iResult);
		return -1;
	}


#if defined DEBUG_OUTPUT
	elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif

	return 0;

}

/*****************************************************************************
 * Constructs and executes an SQL query to write a record of this tuple change
 * to the pending table.
 *****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
			 HeapTuple tAfterTuple,
			 TupleDesc tTupDesc,
			 Oid tableOid,
			 char cOp, int slaveid, char origOp)
{
	char	   *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";

	int			iResult = 0;
	HeapTuple	tCurTuple;
	char nulls[5]="    ";

	//Points the current tuple(before or after)
		Datum		saPlanData[5];
	Oid			taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, INT4OID, CHAROID};
	void	   *vpPlan;

	tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;



	vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
	if (vpPlan == NULL)
		elog(NOTICE, "Error creating plan");
	/* SPI_saveplan(vpPlan); */

	saPlanData[0] = PointerGetDatum(cpTableName);
	saPlanData[1] = CharGetDatum(cOp);
	saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
	saPlanData[3] = Int32GetDatum(slaveid);
	if (slaveid <=0) nulls[3]='n';
	saPlanData[4] = CharGetDatum(origOp);

	iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
	if (iResult < 0)
		elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);


#if defined DEBUG_OUTPUT
	elog(NOTICE, "row successfully stored in pending table");
#endif

	if (cOp == 'd')
	{
		/**
		 * This is a record of a delete operation.
		 * Just store the key data.
		 */
		iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
	}
	else if (cOp == 'i')
	{
		/**
		 * An Insert operation.
		 * Store all data
		 */
		iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE);

	}
	else
	{
		/* op must be an update. */
		iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
		iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE);
	}

#if defined DEBUG_OUTPUT
	elog(NOTICE, "Done storing keyinfo/data");
#endif

	return iResult;

}

int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
			 TupleDesc tTupleDesc, Oid tableOid)
{

	Oid			saPlanArgTypes[1] = {VARCHAROID};
	char	   *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
	void	   *pplan;
	Datum		saPlanData[1];
	char	   *cpKeyData;
	char	   *cpKeyData_tmp;
	int			iRetCode;

	pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
	if (pplan == NULL)
	{
		elog(NOTICE, "Could not prepare INSERT plan");
		return -1;
	}

	/* pplan = SPI_saveplan(pplan); */
	cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, PRIMARY);
	if (cpKeyData == NULL)
	{
		elog(ERROR,"Could not determine primary key data");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
	cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
	memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
	SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
	saPlanData[0] = PointerGetDatum(cpKeyData_tmp);

	iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);

	if (cpKeyData != NULL)
		pfree(cpKeyData);
	if (cpKeyData_tmp != 0)
		pfree(cpKeyData_tmp);

	if (iRetCode != SPI_OK_INSERT)
	{
		elog(NOTICE, "Error inserting row in storeKeyInfo");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Insert successful");
#endif

	return 0;

}




int2vector *
getPrimaryKey(Oid tblOid)
{
	char	   *queryBase;
	char	   *query;
	bool		isNull;
	int2vector *resultKey;
	int2vector *tpResultKey;
	HeapTuple	resTuple;
	Datum		resDatum;
	int			ret;

	queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
	query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
	sprintf(query, "%s%d", queryBase, tblOid);
	ret = SPI_exec(query, 1);
	if (ret != SPI_OK_SELECT || SPI_processed != 1)
	{
		elog(NOTICE, "Could not select primary index key");
		return NULL;
	}

	resTuple = SPI_tuptable->vals[0];
	resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);

	if (isNull) {
		elog(NOTICE, "PKey is NULL");
		return NULL;
	}

	tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
	int n=tpResultKey->dim1;
	resultKey = palloc(Int2VectorSize(n));
	if (n > 0)
		memcpy(resultKey->values, tpResultKey->values, n * sizeof(int16));

	SET_VARSIZE(resultKey, Int2VectorSize(n));
	resultKey->ndim = 1;
	resultKey->dataoffset = 0;
	resultKey->elemtype = INT2OID;
	resultKey->dim1 = n;
	resultKey->lbound1 = 0;

	pfree(query);
	return resultKey;
}

/******************************************************************************
 * Stores a copy of the non-key data for the row.
 *****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{

	Oid			planArgTypes[1] = {VARCHAROID};
	char	   *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
	SPIPlanPtr pplan;
	Datum		planData[1];
	char	   *cpKeyData;
	char	   *cpKeyData_tmp;
	int			iRetValue;

	pplan = SPI_prepare(insQuery, 1, planArgTypes);
	if (pplan == NULL)
	{
		elog(NOTICE, "Could not prepare INSERT plan");
		return -1;
	}

	/* pplan = SPI_saveplan(pplan); */
	if (iIncludeKeyData == 0)
		cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, tableOid, NONPRIMARY);
	else
		cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, ALL);

	cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
	memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
        SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
	planData[0] = PointerGetDatum(cpKeyData_tmp);

	iRetValue = SPI_execp(pplan, planData, NULL, 1);

	if (cpKeyData != 0)
		pfree(cpKeyData);
	if (cpKeyData_tmp != 0)
		pfree(cpKeyData_tmp);

	if (iRetValue != SPI_OK_INSERT)
	{
		elog(NOTICE, "Error inserting row in storeData");
		return -1;
	}
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Insert successful");
#endif

	return 0;

}

/**
 * Packages the data in tTupleData into a string of the format
 * FieldName='value text'  where any quotes inside of value text
 * are escaped with a backslash and any backslashes in value text
 * are esacped by a second back slash.
 *
 * tTupleDesc should be a description of the tuple stored in
 * tTupleData.
 *
 * eFieldUsage specifies which fields to use.
 *	PRIMARY implies include only primary key fields.
 *	NONPRIMARY implies include only non-primary key fields.
 *	ALL implies include all fields.
 */
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
			enum FieldUsage eKeyUsage)
{
	int			iNumCols;
	int2vector *tpPKeys = NULL;
	int			iColumnCounter;
	char	   *cpDataBlock;
	int			iDataBlockSize;
	int			iUsedDataBlock;

	iNumCols = tTupleDesc->natts;

	if (eKeyUsage != ALL)
	{
		tpPKeys = getPrimaryKey(tableOid);
		if (tpPKeys == NULL)
			return NULL;
	}
#if defined DEBUG_OUTPUT
	if (tpPKeys != NULL)
		elog(NOTICE, "Have primary keys");
#endif
	cpDataBlock = palloc(BUFFER_SIZE);
	iDataBlockSize = BUFFER_SIZE;
	iUsedDataBlock = 0;			/* To account for the null */

	for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
	{
		int			iIsPrimaryKey;
		int			iPrimaryKeyIndex;
		char	   *cpUnFormatedPtr;
		char	   *cpFormatedPtr;

		char	   *cpFieldName;
		char	   *cpFieldData;

		if (eKeyUsage != ALL)
		{
			/* Determine if this is a primary key or not. */
			iIsPrimaryKey = 0;
			int16 *tpPKeysV = tpPKeys->values;
			int tpPKeysSize = tpPKeys->dim1;
			for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
				 iPrimaryKeyIndex++)
			{
				if (tpPKeysV[iPrimaryKeyIndex] == iColumnCounter)
				{
					iIsPrimaryKey = 1;
					break;
				}
			}
			if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
			{
				/**
				 * Don't use.
				 */
#if defined DEBUG_OUTPUT
				elog(NOTICE, "Skipping column");
#endif
				continue;
			}
		}						/* KeyUsage!=ALL */
#ifndef  NODROPCOLUMN
               // this comment is for 11
               if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
               //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
                 {
                   /**
                    * This column has been dropped.
                    * Do not mirror it.
                    */
                   continue;
                 }
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
		if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
			continue;
		}

		// this comment is for 11
		cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
		//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
		elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
		while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
		{
			cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
			iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
		}
		sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
		iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
		cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);

		cpUnFormatedPtr = cpFieldData;
		cpFormatedPtr = cpDataBlock + iUsedDataBlock;
		if (cpFieldData != NULL)
		{
			*cpFormatedPtr = '\'';
			iUsedDataBlock++;
			cpFormatedPtr++;
		}
		else
		{
			sprintf(cpFormatedPtr," ");
			iUsedDataBlock++;
			cpFormatedPtr++;
			continue;

		}
#if defined DEBUG_OUTPUT
		elog(NOTICE, "FieldData: %s", cpFieldData);
		elog(NOTICE, "Starting format loop");
#endif
		while (*cpUnFormatedPtr != 0)
		{
			while (iDataBlockSize - iUsedDataBlock < 2)
			{
				cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
				iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
				cpFormatedPtr = cpDataBlock + iUsedDataBlock;
			}
			if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'')
			{
				*cpFormatedPtr = '\\';
				cpFormatedPtr++;
				iUsedDataBlock++;
			}
			*cpFormatedPtr = *cpUnFormatedPtr;
			cpFormatedPtr++;
			cpUnFormatedPtr++;
			iUsedDataBlock++;
		}

		pfree(cpFieldData);

		while (iDataBlockSize - iUsedDataBlock < 3)
		{
			cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
			iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
			cpFormatedPtr = cpDataBlock + iUsedDataBlock;
		}
		sprintf(cpFormatedPtr, "' ");
		iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
		elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif

	}							/* for iColumnCounter  */
	if (tpPKeys != NULL)
		pfree(tpPKeys);
#if defined DEBUG_OUTPUT
	elog(NOTICE, "Returning: DataBlockSize:%d iUsedDataBlock:%d",iDataBlockSize,
			iUsedDataBlock);
#endif
	memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);

	return cpDataBlock;

}





bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
	char *qb1;
	char *qb2;
	char *q;
	char *fieldName;
	int ret;
	HeapTuple resTuple;
	SPITupleTable *SIDKEY_tupTable;
	int     SIDKEY_processed;
	char *value;
#if defined DEBUG_OUTPUT
	elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
	fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
	elog(NOTICE, "fieldName=%s",fieldName);
#endif
	qb1 = "SELECT ";
	qb2 = " = any(attnames) from dbmirror_exclude_attributes where tblname=";
#if defined DEBUG_OUTPUT
	elog(NOTICE, "%s q size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
	q = palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
	sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	pfree(fieldName);

	if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (value == NULL) return FALSE;

	if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
		pfree(value);
		return TRUE;
	} 
	else {
		pfree(value);
		return FALSE;
	}
}




#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) {

	HeapTuple       tuple;
	tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
	elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif

	if (slaveid == 0 && op == 'm') {
		elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
		return -1;
	}
	/*
	ISEXPUNCOND
	*/
	else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
	/*
	ISEXPWITHSLAVEID
	*/
	else if (slaveid > 0 && op =='d') {
		int retval;
		retval =  storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		return updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
	}
	else if (slaveid > 0 && op == 'i') {
		int retval;
		retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
		retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		return retval;
	}
	else if (slaveid > 0 && op == 'u') {
		int retval;
		int old_slaveid;
	 	old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
		retval = handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
		if (old_slaveid != -3 && old_slaveid != slaveid) {
			retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
		}
		if (old_slaveid != slaveid) {
			retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
		}
		else {
			retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
		}
		if (old_slaveid != -3) {
			retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
		}
		return retval;
	}
	else if (slaveid == -3 && (op =='d' || op =='i')) {
		return 0;
	}
	else if (slaveid == -3 && op =='u') {
		int retval=0;
		int old_slaveid;
	 	old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
		if (old_slaveid > 0) {
			retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
			retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
		}
		return retval;
	}
	/*
	ISIMPLBACKWARD KALESMENO apo handleParents
	*/
	else if (slaveid >0 && op == 'm') {
		if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
		return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei paidi ISIMPLBACKWARD
*/
/*
			elog(ERROR,"Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen");
*/
		}
		if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
		return 0;
/*
*/
		}
		if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
		return 0;
		}
		if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
		else {
			int retval;
			retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
			retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
			return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
		}
	}
	/*
	ISIMPL* but called from trigger ('i','d','u')
	*/

	/*
	ISIMPLBACKWARD
	*/
	else if (slaveid == -1) {
		if (op == 'i') return 0;
		/*
		Delete in this fashion may happen for "orphan" rows.
		The deletions will be "triggered" by childern deletions/updates.
		*/
		else if (op == 'd') {
			int retval=0;
			int *slaves;
			int *run;
			slaves = getSlaves(cpTableName,pkxpress);
			if (slaves == NULL) return 0;
			for (run=slaves;*run;run++) {
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
			}

			pfree(slaves);
			return retval || deleteAccnt(cpTableName,pkxpress); 

		}
		else if (op == 'u') {
			int retval=0;
			int *slaves;
			int *run;

			slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"in slaveid=-1, op='u', slaves=%d",(int)slaves);
#endif
			if (slaves == NULL) return 0;
			for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
				elog(NOTICE,"in slaveid=-1, op='u', runslave=%d",*run);
#endif
				retval = retval || handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
			}
			pfree(slaves);
			return retval;
		}
	}
	/*
	ISIMPLFORWARD
	*/
	else if (slaveid == -2) {
		if (op == 'i') {
			int retval=0;
			int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
			if (forw_slaveid>0) {
				retval = retval || handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
			}
			return retval;
		}
		else if (op == 'd') {
			int retval=0;
			int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);

			if (forw_slaveid>0) {
				retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
				retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
			}
			return retval ; 

		}
		else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
			int retval=0;

			int forw_slaveid = getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
			int old_slaveid = getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
			//char origop = getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
			//if (origop == 'i') {
			//	old_slaveid = forw_slaveid;
			//}
			//elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
			// LEGACY CODE as of 2016-03-18
			//retval = retval || handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
			//retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
			//retval = retval || updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
			if (forw_slaveid>0) {
				retval = handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
				if (old_slaveid != -3 && old_slaveid != forw_slaveid) {
					retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
				}
				if (old_slaveid != forw_slaveid) {
					retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
				}
				else {
					retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
				}
				if (old_slaveid != -3) {
					retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
				}
			}
			else { // ISIMPLFORWARD me vslid IS NULL --> -3
				if (old_slaveid > 0) {
					retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
					retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
				}
			}
			return retval;
		}
	}
	return 0;
}

int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
	char *slave_siteidkeyname;
	char *qb;
	char *q;
	char *foo;
	int ret;
	HeapTuple resTuple;
	int attnum;
	int slaveid;
	SPITupleTable *SIDKEY_tupTable;
	int     SIDKEY_processed;
#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
		/*
		ISIMPLBACKWARD
		*/
	if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
		/*
		ISEXPUNCOND
		*/
	if (slave_siteidkeyname == NULL) return 0;
		/*
		ISIMPLFORWARD
		*/
	if (strncmp(slave_siteidkeyname,"___",3) == 0) {
		pfree(slave_siteidkeyname);
		return -2;
	}
	attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		elog(ERROR,"WRONG slaveidkeyname=%s given for table %s",slave_siteidkeyname,cpTableName);
		pfree(slave_siteidkeyname);
		return 0;
	}
	foo=SPI_getvalue(tuple,tupleDesc,attnum);
		/*
		ISEXPWITHSLAVEID BUT siteidkeyname is null
		*/
	if (foo == NULL) {
		pfree(slave_siteidkeyname);
		return -3;
	}
	pfree(slave_siteidkeyname);
	slaveid = atoi(foo);
	pfree(foo);
		/*
		ISEXPWITHSLAVEID
		*/
	return slaveid;
}

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}

	q = palloc(512);
	sprintf(q,"SELECT * FROM public.%s where %s='%s'",forw_table,forw_table_id,pathcolval);
	ret = SPI_exec(q,1);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		pfree(slave_siteidkeyname);
		elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should have row with %s=%s",forw_table,forw_table_id,pathcolval);
		return -2;
	}
	pfree(q);
	RealPar_tupTable = SPI_tuptable;
	RealPar_tuple = RealPar_tupTable->vals[0];
	ParTableName = palloc(256);
	sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
	ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
	if (ret == -2)  {
		pfree(slave_siteidkeyname);
		ret = getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
		pfree(ParTableName);
		return ret;
	}
	if (ret <= 0) {
		pfree(slave_siteidkeyname);
		pfree(ParTableName);
//		elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should have slaveid>0",forw_table);
/***
		apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID kai slaveid IS NULL
***/
		return -3;
	}
	pfree(ParTableName);
	pfree(slave_siteidkeyname);
	return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai meta
me query na briskoume to tuple tou parent table.
*/
}

int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}
/*
consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			slaveid_d = -3;
		}
		else {
			slaveid_d = atoi(foo);
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return slaveid_d;
	}
/* dirty patch code to handle copy supplycase. it was supposed to solve the problem of not turning 'u' to 'i' for e.g supplycasesquotes */ 
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues when changing vslid (null->int, int->null, intX->intY) */ 
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type ISIMPLFORWARD (-2) tables */
/*

	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",cpTableName,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			slaveid_d = -3;
		}
		else {
			slaveid_d = atoi(foo);
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return slaveid_d;
	}
*/

/**
pls consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing

if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai (eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, then update on the kid, and then perform a second UPDATE on the kid, this will result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i') 
eite parent op != 'i' : (synepagetai : 
	eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou statement pio pano
	eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
 then we let the code fallback to return getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
	q = palloc(512);
	sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		pfree(slave_siteidkeyname);
		pfree(pathcolval);
		return -3;
	}

	pfree(slave_siteidkeyname);
	pfree(pathcolval);
	return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */

/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio xid kai op='d'
*/
}

/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
	qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
	q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
	sprintf(q,"%s'%s'",qb,cpTableName);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	SIDKEY_tupTable = SPI_tuptable;
	SIDKEY_processed = SPI_processed;
	resTuple = SIDKEY_tupTable->vals[0];
	slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
	if (slave_siteidkeyname == NULL)  {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	if (strncmp(slave_siteidkeyname,"___",3) != 0) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
//	elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
	path = slave_siteidkeyname + 3;
//	elog(NOTICE,"path=-%s-",path);
	run = index(path,'_');
	if (run == NULL) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
	}
	pathcol = palloc(MAXCOL_LEN);
//	elog(NOTICE,"path=-%d-",(unsigned)path);
//	elog(NOTICE,"run=-%d-",(unsigned)run);
//	elog(NOTICE,"run-path=-%d-",run-path);
	*run = 0;
	strncpy(pathcol,path,run-path+1);

//	elog(NOTICE,"pathcol=-%s-",pathcol);

	forw_table = run+1;

	attnum = SPI_fnumber(tupleDesc,pathcol);
	if (attnum == SPI_ERROR_NOATTRIBUTE) {
		pfree(slave_siteidkeyname);
		elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
		return 0;
        }
	pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/	
	/*
	forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
	*/
/*

	run = index(forw_table,'@');
	if (run == NULL) {
		forw_table_id = palloc(MAXCOL_LEN);
		strncpy(forw_table_id,"id",3);
	}
	else {
		forw_table_id = palloc(MAXCOL_LEN);
		*run=0;
		strncpy(forw_table_id,forw_table,run-forw_table+1);
		forw_table=run+1;
	}

	q = palloc(512);
	sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
	ret = SPI_exec(q,1);
	if (ret == SPI_OK_SELECT && SPI_processed == 1) {
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
		if (foo == NULL) {
			pfree(q);
			pfree(pathcolval);
			elog(ERROR,"WRONG forward parent table %s has null origop in this transaction",forw_table);
		}
		else {
			origop = foo[0];
			pfree(foo);
		}
		pfree(q);
		pfree(pathcolval);
		return origop;
	}
	return ' ';
}
*/

#define MAX_WHERE_CLAUSE	512
#define MAX_QUERY_LEN	512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
	char *qb;
	char *q;
	bool isNull;
	int ret;
	int i;
	HeapTuple resTuple;
	Datum resDatum;
	Oid confrelid;
	char *confrelname;
	int16	*thisCols;
	int16	*fkCols;
	int16	*run;
	ArrayType *arr;
	char	AthisColsVals[MAXKCOLS][MAXCOL_LEN];
	char	fkColsNames[MAXKCOLS][MAXCOL_LEN];
	char	thisColsTypes[MAXKCOLS][100];
	short	numOfCols;
	SPITupleTable *FK_tupTable;
	int	FK_processed;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif

	qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
	q = palloc(strlen(qb)+MAX_OID_LEN+1);
	sprintf(q,"%s%d",qb,tableOid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
		elog(NOTICE, "no FKs");
		return 0;
	}
	/*
	For every FK dependency we track and handle the parent table
	*/
	FK_tupTable = SPI_tuptable;
	FK_processed = SPI_processed;

	for (i=0;i<FK_processed;i++) {
		char *WHERE;
		char *ParTableName;
		int j;
		char FkHasNullValueORXcluded=0;
		char handleit=0;

		SPITupleTable *RealPar_tupTable;
		int     RealPar_processed;
		HeapTuple RealPar_tuple;
		int colrun;
		SPITupleTable *thisatt_tupTable;
		int     thisatt_processed;
		HeapTuple thisatt_tuple;

		resTuple = FK_tupTable->vals[i];


		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
		confrelid = (Oid) DatumGetObjectId(resDatum);

		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		thisCols = (int16 *)ARR_DATA_PTR(arr);
		numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
		int DIM_0 = ARR_DIMS(arr)[0];
		int LBOUND_0 = ARR_LBOUND(arr)[0];
//		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
//		confrelname = (char *) DatumGetName(resDatum);
//		elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
			int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
			char *fooname;

			q = palloc(512);
			sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
		                elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
				return -2;
			}
			pfree(q);
			thisatt_processed = SPI_processed;
			thisatt_tupTable = SPI_tuptable;
			thisatt_tuple = thisatt_tupTable->vals[0];
			fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
			char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
			char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

			

#if defined DEBUG_OUTPUT
			elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s",fooname,*run,thiscolrun,value);
#endif
			if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun))  {
				FkHasNullValueORXcluded=1;
				break;
			}
			memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
			memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
			
			if (Btuple != NULL) {
				char *Bvalue = SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s,Before value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
				if (Bvalue == NULL) handleit=1;
				else if (strcmp(Bvalue,value)) {
					handleit=1;
					pfree(Bvalue);
				}

			}
			else handleit=1;
			pfree(value);
		}
#if defined DEBUG_OUTPUT
		elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
		elog(NOTICE,"handle it = %d, ",handleit);
		elog(NOTICE,"FkHasNullValueORXcluded  = %d, ",FkHasNullValueORXcluded);
#endif
		if (FkHasNullValueORXcluded || !handleit) continue;
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		fkCols = (int16 *)ARR_DATA_PTR(arr);
		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
			SPITupleTable *PAR_tupTable;
			int     PAR_processed;
			HeapTuple PAR_resTuple;
			char *value;
			q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
			sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
				elog(ERROR, "Fatal Error:no PK for FK relation");
				return -2;
			}
			pfree(q);
			PAR_tupTable = SPI_tuptable;
			PAR_processed = SPI_processed;
			PAR_resTuple = PAR_tupTable->vals[0];
			value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
			memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
			pfree(value);

		}
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
		confrelname = (char *) DatumGetName(resDatum);
	/*
	There must be at least 1 column in compound FK!!
	*/
		WHERE = palloc(MAX_WHERE_CLAUSE);
		if (!strncmp(thisColsTypes[0],"int",3))
			sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
		else
			sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
		for (j=1;j<numOfCols;j++) {
			if (!strncmp(thisColsTypes[j],"int",3))
				sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
			else
				sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
		}

		q = palloc(512);
		sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
		//elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
		elog(NOTICE,"Found FK, SQL=%s",q);
#endif
		ret = SPI_exec(q,1);
		if (ret != SPI_OK_SELECT || SPI_processed != 1) {
	                elog(ERROR, "Fatal error: handleParents: par table %s should have row with %s",confrelname,WHERE);
			return -2;
		}
		pfree(q);
		RealPar_processed = SPI_processed;
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		ParTableName = palloc(256);
		sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
		ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
		pfree(ParTableName);
		pfree(WHERE);

	}
	return 0;
}

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	/*
	sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
	else return 1;
	*/
	sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
	else return 1;
}

int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;
	bool isNull;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
	return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}

int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	/*
	public.dbmirror_accounting.cnt DEFAULT = 1
	*/
	sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,1);
	pfree(q);
	if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
	else return 0;
}


char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) {
	int2vector *pk;
	int i;
	char *WHERE = palloc(MAX_WHERE_CLAUSE);

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif

	pk = getPrimaryKey(tableOid);
	if (pk == NULL) return NULL;
	int16 *pkV = pk->values;
	int pkSize = pk->dim1;
	if (pkSize>0) {
		char *keyname;
		char *keyval;
		keyname = SPI_fname(tupleDesc,pkV[0]);
		if (keyname == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[0],cpTableName);
			return NULL;
		}
		keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
		if (keyval == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[0],cpTableName);
			return NULL;
		}

		sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
		pfree(keyname);
		pfree(keyval);
	}
	for (i=1;i<pkSize;i++) {
		char *keyname;
		char *keyval;
		keyname = SPI_fname(tupleDesc,pkV[i]);
		if (keyname == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[i],cpTableName);
			return NULL;
		}
		keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
		if (keyval == NULL) {
			elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[i],cpTableName);
			return NULL;
		}

		sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
		pfree(keyname);
		pfree(keyval);
		
	}
	pfree(pk);
	return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
	char *q;
	int ret;
	int SLAVE_processed;
	SPITupleTable *SLAVE_tupTable;
	int *slaves;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in getSlaves of Table=%s, pkxpress=%s",cpTableName,pkxpress);
#endif

	if (pkxpress == NULL) return NULL;
	
	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
	SLAVE_tupTable = SPI_tuptable;
	SLAVE_processed = SPI_processed;
	slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
	elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
	for (ret=0;ret<SLAVE_processed;ret++) {
		char *slaveidStr = SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
		*(slaves+ret) = atoi(slaveidStr);
		pfree(slaveidStr);
		
	}
	*(slaves+SLAVE_processed) = 0;
	return slaves;


}

int deleteAccnt (char *cpTableName, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret == SPI_OK_DELETE) return 0;
	else return -2;
}

int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
	char *q;
	int ret;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif

	q = palloc(MAX_QUERY_LEN);
	sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret == SPI_OK_DELETE) return 0;
	else return -2;
}

int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
	char *qb;
	char *q;
	bool isNull;
	int ret;
	int i;
	HeapTuple resTuple;
	Datum resDatum;
	Oid confrelid;
	char *confrelname;
	int16	*thisCols;
	int16	*fkCols;
	int16	*run;
	ArrayType *arr;
	char	BthisColsVals[MAXKCOLS][MAXCOL_LEN];
	char	fkColsNames[MAXKCOLS][MAXCOL_LEN];
	char	thisColsTypes[MAXKCOLS][100];
	short	numOfCols;
	SPITupleTable *FK_tupTable;
	int	FK_processed;
	int ParSlaveId;

#if defined DEBUG_OUTPUT
	elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif

	qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
	q = palloc(strlen(qb)+MAX_OID_LEN+1);
	sprintf(q,"%s%d",qb,tableOid);
	ret = SPI_exec(q,0);
	pfree(q);
	if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
		elog(NOTICE, "no FKs");
		return 0;
	}
	/*
	For every FK dependency we track and handle the parent table
	*/
	FK_tupTable = SPI_tuptable;
	FK_processed = SPI_processed;

	for (i=0;i<FK_processed;i++) {
		char *WHERE;
		char *ParTableName;
		int j;
		char FkHasNullValueORXcluded=0;
		char decreaseit=0;

		SPITupleTable *RealPar_tupTable;
		int     RealPar_processed;
		HeapTuple RealPar_tuple;
		int colrun;
		SPITupleTable *thisatt_tupTable;
                int     thisatt_processed;
                HeapTuple thisatt_tuple;


		resTuple = FK_tupTable->vals[i];


		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
		confrelid = (Oid) DatumGetObjectId(resDatum);

		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		thisCols = (int16 *)ARR_DATA_PTR(arr);
		numOfCols=ARR_DIMS(arr)[0];
		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
			int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
                        char *fooname;

                        q = palloc(512);
                        sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
                                return -2;
                        }
                        pfree(q);
                        thisatt_processed = SPI_processed;
                        thisatt_tupTable = SPI_tuptable;
                        thisatt_tuple = thisatt_tupTable->vals[0];
                        fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
                        char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

			char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));



#if defined DEBUG_OUTPUT
			elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s",fooname,*run,thiscolrun,value);
#endif
			if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun))  {
				FkHasNullValueORXcluded=1;
				break;
				/*
				Nothing is done regarding "Before" status of parent since there is not one
				*/
			}
			memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
			memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);

			if (Atuple != NULL) {
				char *Avalue = SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
				elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s,After value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
				if (Avalue == NULL) decreaseit=1;
				else if (strcmp(Avalue,value)) {
					decreaseit=1;
					pfree(Avalue);
				}
			}
			else decreaseit=1; /* is delete operation*/
			pfree(value);
		}
#if defined DEBUG_OUTPUT
		elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
		elog(NOTICE,"decrease it = %d, ",decreaseit);
		elog(NOTICE,"FkHasNullValueORXcluded  = %d, ",FkHasNullValueORXcluded);
#endif

		if (FkHasNullValueORXcluded || !decreaseit) continue;


		
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
		fkCols = (int16 *)ARR_DATA_PTR(arr);
		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
			SPITupleTable *PAR_tupTable;
			int     PAR_processed;
			HeapTuple PAR_resTuple;
			char *value;
			q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
			sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
			ret = SPI_exec(q,1);
			if (ret != SPI_OK_SELECT || SPI_processed != 1) {
				elog(ERROR, "Fatal Error:no PK for FK relation");
				return -2;
			}
			pfree(q);
			PAR_tupTable = SPI_tuptable;
			PAR_processed = SPI_processed;
			PAR_resTuple = PAR_tupTable->vals[0];
			value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
			memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
			pfree(value);

		}
		
		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
		confrelname = (char *) DatumGetName(resDatum);
	/*
	there must be at least 1 column in compound FK!!
	*/
		WHERE = palloc(MAX_WHERE_CLAUSE);
		if (!strncmp(thisColsTypes[0],"int",3))
			sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
		else
			sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
		for (j=1;j<numOfCols;j++) {
			if (!strncmp(thisColsTypes[j],"int",3))
				sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
			else
				sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
		}

		q = palloc(512);
		sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
		//elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
		elog(NOTICE,"Found FK, SQL=%s",q);
#endif
		ret = SPI_exec(q,1);
		if (ret != SPI_OK_SELECT || SPI_processed != 1) {
	                elog(ERROR, "Fatal error: updateAcctParents: par table %s should have row with %s",confrelname,WHERE);
			return -2;
		}
		pfree(q);
		RealPar_processed = SPI_processed;
		RealPar_tupTable = SPI_tuptable;
		RealPar_tuple = RealPar_tupTable->vals[0];
		ParTableName = palloc(256);
		sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
		ParSlaveId = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
		if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
			pfree(ParTableName);
			continue;
		}
		if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
			ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
			ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
			ret = updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);

		}
		pfree(ParTableName);
		pfree(WHERE);

	}
	return 0;
}

  [text/x-log] make.log (9.0K, 4-make.log)
  download | inline:
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wcast-function-type -Wshadow=compatible-local -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -fPIC -DPIC -fvisibility=hidden -I. -I./ -I/usr/local/pgsql/include/server -I/usr/local/pgsql/include/internal -I/usr/local/include     -c -o pending.o pending.c
pending.c: In function 'storePending':
pending.c:249:25: warning: variable 'tCurTuple' set but not used [-Wunused-but-set-variable]
  249 |         HeapTuple       tCurTuple;
      |                         ^~~~~~~~~
In file included from /usr/local/pgsql/include/server/nodes/execnodes.h:34,
                 from /usr/local/pgsql/include/server/commands/trigger.h:18,
                 from /usr/local/pgsql/include/server/executor/spi.h:16,
                 from pending.c:36:
pending.c: In function 'getPrimaryKey':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:402:54: note: in expansion of macro 'PG_DETOAST_DATUM'
  402 |         tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                      ^~~~~~~~~~~~~~~~
In file included from pending.c:34:
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:403:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
  403 |         int n=tpResultKey->dim1;
      |         ^~~
pending.c: In function 'packageData':
pending.c:527:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
  527 |                         int16 *tpPKeysV = tpPKeys->values;
      |                         ^~~~~
pending.c:551:29: error: 'struct TupleDescData' has no member named 'attrs'
  551 |                if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
      |                             ^~
pending.c:571:71: error: 'struct TupleDescData' has no member named 'attrs'
  571 |                 cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
      |                                                                       ^~
pending.c: In function 'isExcluded':
pending.c:664:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  664 |         int     SIDKEY_processed;
      |                 ^~~~~~~~~~~~~~~~
pending.c: In function 'handler':
pending.c:729:21: warning: variable 'retval' set but not used [-Wunused-but-set-variable]
  729 |                 int retval;
      |                     ^~~~~~
pending.c: In function 'getSlaveId':
pending.c:928:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  928 |         int     SIDKEY_processed;
      |                 ^~~~~~~~~~~~~~~~
pending.c: In function 'getComputedSlaveId':
pending.c:993:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  993 | int SIDKEY_processed;
      |     ^~~~~~~~~~~~~~~~
pending.c: In function 'getOldComputedSlaveId':
pending.c:1119:7: warning: unused variable 'ParTableName' [-Wunused-variable]
 1119 | char *ParTableName;
      |       ^~~~~~~~~~~~
pending.c:1116:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
 1116 | int SIDKEY_processed;
      |     ^~~~~~~~~~~~~~~~
pending.c: In function 'handleParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1444:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1444 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1472:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1472 |                         char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
      |                         ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1509:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1509 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1513:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
 1513 |                         int     PAR_processed;
      |                                 ^~~~~~~~~~~~~
pending.c:1434:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
 1434 |                 int     thisatt_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c:1430:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
 1430 |                 int     RealPar_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c: In function 'getPKxpress':
pending.c:1651:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1651 |         int16 *pkV = pk->values;
      |         ^~~~~
pending.c: In function 'updateAccntParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1825:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1825 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1846:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1846 |                         char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
      |                         ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1891:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1891 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1895:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
 1895 |                         int     PAR_processed;
      |                                 ^~~~~~~~~~~~~
pending.c:1814:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
 1814 |                 int     thisatt_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c:1810:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
 1810 |                 int     RealPar_processed;
      |                         ^~~~~~~~~~~~~~~~~
gmake: *** [<builtin>: pending.o] Error 1

  [text/x-patch] pending_changes_against_18beta1.patch (3.8K, 5-pending_changes_against_18beta1.patch)
  download | inline diff:
diff --git a/code/dbmirror/pending.c b/code/dbmirror/pending.c
index fca76318..b1c61e02 100644
--- a/code/dbmirror/pending.c
+++ b/code/dbmirror/pending.c
@@ -98,7 +98,7 @@ char *get_namespace_name(Oid nspid);
 
 #define BUFFER_SIZE 256
 #define MAX_OID_LEN 10
-//#define DEBUG_OUTPUT 1
+#define DEBUG_OUTPUT 1
 extern Datum recordchange(PG_FUNCTION_ARGS);
 
 PG_FUNCTION_INFO_V1(recordchange);
@@ -399,7 +399,8 @@ getPrimaryKey(Oid tblOid)
 		return NULL;
 	}
 
-	tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+	//tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+	tpResultKey = (int2vector *) PG_DETOAST_DATUM(resDatum);
 	int n=tpResultKey->dim1;
 	resultKey = palloc(Int2VectorSize(n));
 	if (n > 0)
@@ -548,7 +549,8 @@ packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid ta
 		}						/* KeyUsage!=ALL */
 #ifndef  NODROPCOLUMN
                // this comment is for 11
-               if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+               ////if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+		if (TupleDescAttr(tTupleDesc,iColumnCounter-1)->attisdropped)
                //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
                  {
                    /**
@@ -568,7 +570,8 @@ eksairetea columns
 		}
 
 		// this comment is for 11
-		cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+		////cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+		cpFieldName = NameStr(TupleDescAttr(tTupleDesc,iColumnCounter - 1)->attname );
 		//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
 #if defined DEBUG_OUTPUT
 		elog(NOTICE, "FieldName: %s", cpFieldName);
@@ -1441,7 +1444,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
 		confrelid = (Oid) DatumGetObjectId(resDatum);
 
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		thisCols = (int16 *)ARR_DATA_PTR(arr);
 		numOfCols=ARR_DIMS(arr)[0];
 #if defined DEBUG_OUTPUT
@@ -1506,7 +1510,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
 #endif
 		if (FkHasNullValueORXcluded || !handleit) continue;
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		fkCols = (int16 *)ARR_DATA_PTR(arr);
 		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
 			SPITupleTable *PAR_tupTable;
@@ -1822,7 +1827,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
 		confrelid = (Oid) DatumGetObjectId(resDatum);
 
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		thisCols = (int16 *)ARR_DATA_PTR(arr);
 		numOfCols=ARR_DIMS(arr)[0];
 		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
@@ -1888,7 +1894,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
 		
 		
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		fkCols = (int16 *)ARR_DATA_PTR(arr);
 		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
 			SPITupleTable *PAR_tupTable;


view thread (2+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Postgresql 18beta1 and SPI changes
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox