public inbox for [email protected]
help / color / mirror / Atom feedRe: Postgresql 18beta1 and SPI changes
2+ messages / 1 participants
[nested] [flat]
* Re: Postgresql 18beta1 and SPI changes
@ 2025-05-12 12:50 Achilleas Mantzios <[email protected]>
2025-05-13 17:12 ` Re: Postgresql 18beta1 and SPI changes Achilleas Mantzios <[email protected]>
0 siblings, 1 reply; 2+ messages in thread
From: Achilleas Mantzios @ 2025-05-12 12:50 UTC (permalink / raw)
To: Tom Lane <[email protected]>; +Cc: [email protected] <[email protected]>; [email protected]
Dear All, Dear Tom
On 5/11/25 16:20, Tom Lane wrote:
> Achilleas Mantzios<[email protected]> writes:
>> We use are own version of DBmirror, we run our replication in a highly
>> fine grained manner. So every upgrade I have to make the code compile
>> and test. Up to PostgreSQL 17, I only got minor compilation problems
>> that I managed to resolve fairly easily. However this didn't prove to be
>> the case with PostgreSQL 18beta1, it proved harder to compile and as my
>> fears were verified, it has serious problems.
>> My question : is 18's SPI stabilized ? Can I start work on our version
>> of DBmirror ? Or wait for 18beta2 or -RC ?
> If you think there are changes we need to make, you'd better get
> specific sooner not later. I'm not aware of any large fixes that
> are pending, cf
>
> https://wiki.postgresql.org/wiki/PostgreSQL_18_Open_Items
I attach
a) our old source (pending.c.orig), as of PostgreSQL 17 (tested for some
7 months, so pretty well tested),
b) the compilation errors when compiled against 18beta1, and
c) the patch that I came up with, which seems (in my minimal testing) to
yield correct results on 18beta1.
The majority of serious warnings have to do with de-toasting arrays and
the PK's int2vector , while the error has to do with getting column
details such as attisdropped and attname.
Please have a look, and share your thoughts. I haven't touched serious C
coding till I first wrote the above sometime in 2004 with a bunch of
additions some years ago.
>
> regards, tom lane
/****************************************************************************
* pending.c
* $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
*
* This file contains a trigger for Postgresql-7.x to record changes to tables
* to a pending table for mirroring.
* All tables that should be mirrored should have this trigger hooked up to it.
*
* Written by Steven Singer ([email protected])
* (c) 2001-2002 Navtech Systems Support Inc.
* ALL RIGHTS RESERVED
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*
***************************************************************************/
#include <string.h>
#include "postgres.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define Int2VectorSize(n) (offsetof(int2vector, values) + (n) * sizeof(int16))
#define TRUE 1
#define FALSE 0
PG_MODULE_MAGIC;
enum FieldUsage
{
PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};
int storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp,int slaveid, char origOp);
int storexid(void);
int handler(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp, int slaveid, char *pkxpress);
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);*/
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData);
int2vector *getPrimaryKey(Oid tblOid);
char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
enum FieldUsage eKeyUsage);
bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);
char *get_namespace_name(Oid nspid);
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
/*****************************************************************************
* The entry point for the trigger function.
* The Trigger takes a single SQL 'text' argument indicating the name of the
* table the trigger was applied to. If this name is incorrect so will the
* mirroring.
****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
TriggerData *trigdata;
TupleDesc tupdesc;
HeapTuple beforeTuple = NULL;
HeapTuple afterTuple = NULL;
HeapTuple retTuple = NULL;
char *tblname;
char op = 0;
char *schemaname;
char *fullyqualtblname;
char *pkxpress;
if (fcinfo->context != NULL)
{
if (SPI_connect() < 0)
{
elog(NOTICE, "recordchange could not connect to SPI");
return -1;
}
trigdata = (TriggerData *) fcinfo->context;
/* Extract the table name */
tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
fullyqualtblname = palloc(strlen(tblname) +
strlen(schemaname) + 6);
sprintf(fullyqualtblname,"\"%s\".\"%s\"",
schemaname,tblname);
#else
fullyqualtblname = palloc(strlen(tblname) + 3);
sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
tupdesc = trigdata->tg_relation->rd_att;
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
retTuple = trigdata->tg_newtuple;
beforeTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_newtuple;
op = 'u';
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_trigtuple;
op = 'i';
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
beforeTuple = trigdata->tg_trigtuple;
op = 'd';
}
if (storexid()) {
elog(ERROR, "Operation could not be mirrored. storexid problem");
return PointerGetDatum(NULL);
}
pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
{
/* An error occoured. Skip the operation. */
elog(ERROR, "Operation could not be mirrored");
return PointerGetDatum(NULL);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning on success");
#endif
pfree(fullyqualtblname);
pfree(pkxpress);
SPI_finish();
return PointerGetDatum(retTuple);
}
else
{
/*
* Not being called as a trigger.
*/
return PointerGetDatum(NULL);
}
}
/*****************************************************************************
* stores the current xid in dbmirror_xactions
*****************************************************************************/
int
storexid(void) {
//char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES ($1)";
char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT $1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";
int iResult = 0;
Datum saPlanData[1];
Oid taPlanArgTypes[1] = {INT4OID};
void *vpPlan;
vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, " storexid Error creating plan");
saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());
iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
if (iResult < 0) {
elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, iResult);
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif
return 0;
}
/*****************************************************************************
* Constructs and executes an SQL query to write a record of this tuple change
* to the pending table.
*****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupDesc,
Oid tableOid,
char cOp, int slaveid, char origOp)
{
char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";
int iResult = 0;
HeapTuple tCurTuple;
char nulls[5]=" ";
//Points the current tuple(before or after)
Datum saPlanData[5];
Oid taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, INT4OID, CHAROID};
void *vpPlan;
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, "Error creating plan");
/* SPI_saveplan(vpPlan); */
saPlanData[0] = PointerGetDatum(cpTableName);
saPlanData[1] = CharGetDatum(cOp);
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
saPlanData[3] = Int32GetDatum(slaveid);
if (slaveid <=0) nulls[3]='n';
saPlanData[4] = CharGetDatum(origOp);
iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
if (iResult < 0)
elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in pending table");
#endif
if (cOp == 'd')
{
/**
* This is a record of a delete operation.
* Just store the key data.
*/
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
}
else if (cOp == 'i')
{
/**
* An Insert operation.
* Store all data
*/
iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE);
}
else
{
/* op must be an update. */
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Done storing keyinfo/data");
#endif
return iResult;
}
int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
TupleDesc tTupleDesc, Oid tableOid)
{
Oid saPlanArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
void *pplan;
Datum saPlanData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetCode;
pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, PRIMARY);
if (cpKeyData == NULL)
{
elog(ERROR,"Could not determine primary key data");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
saPlanData[0] = PointerGetDatum(cpKeyData_tmp);
iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
if (cpKeyData != NULL)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetCode != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeKeyInfo");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
int2vector *
getPrimaryKey(Oid tblOid)
{
char *queryBase;
char *query;
bool isNull;
int2vector *resultKey;
int2vector *tpResultKey;
HeapTuple resTuple;
Datum resDatum;
int ret;
queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
sprintf(query, "%s%d", queryBase, tblOid);
ret = SPI_exec(query, 1);
if (ret != SPI_OK_SELECT || SPI_processed != 1)
{
elog(NOTICE, "Could not select primary index key");
return NULL;
}
resTuple = SPI_tuptable->vals[0];
resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);
if (isNull) {
elog(NOTICE, "PKey is NULL");
return NULL;
}
tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
int n=tpResultKey->dim1;
resultKey = palloc(Int2VectorSize(n));
if (n > 0)
memcpy(resultKey->values, tpResultKey->values, n * sizeof(int16));
SET_VARSIZE(resultKey, Int2VectorSize(n));
resultKey->ndim = 1;
resultKey->dataoffset = 0;
resultKey->elemtype = INT2OID;
resultKey->dim1 = n;
resultKey->lbound1 = 0;
pfree(query);
return resultKey;
}
/******************************************************************************
* Stores a copy of the non-key data for the row.
*****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{
Oid planArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
SPIPlanPtr pplan;
Datum planData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetValue;
pplan = SPI_prepare(insQuery, 1, planArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
if (iIncludeKeyData == 0)
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, tableOid, NONPRIMARY);
else
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, ALL);
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
planData[0] = PointerGetDatum(cpKeyData_tmp);
iRetValue = SPI_execp(pplan, planData, NULL, 1);
if (cpKeyData != 0)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetValue != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeData");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
/**
* Packages the data in tTupleData into a string of the format
* FieldName='value text' where any quotes inside of value text
* are escaped with a backslash and any backslashes in value text
* are esacped by a second back slash.
*
* tTupleDesc should be a description of the tuple stored in
* tTupleData.
*
* eFieldUsage specifies which fields to use.
* PRIMARY implies include only primary key fields.
* NONPRIMARY implies include only non-primary key fields.
* ALL implies include all fields.
*/
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
enum FieldUsage eKeyUsage)
{
int iNumCols;
int2vector *tpPKeys = NULL;
int iColumnCounter;
char *cpDataBlock;
int iDataBlockSize;
int iUsedDataBlock;
iNumCols = tTupleDesc->natts;
if (eKeyUsage != ALL)
{
tpPKeys = getPrimaryKey(tableOid);
if (tpPKeys == NULL)
return NULL;
}
#if defined DEBUG_OUTPUT
if (tpPKeys != NULL)
elog(NOTICE, "Have primary keys");
#endif
cpDataBlock = palloc(BUFFER_SIZE);
iDataBlockSize = BUFFER_SIZE;
iUsedDataBlock = 0; /* To account for the null */
for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
{
int iIsPrimaryKey;
int iPrimaryKeyIndex;
char *cpUnFormatedPtr;
char *cpFormatedPtr;
char *cpFieldName;
char *cpFieldData;
if (eKeyUsage != ALL)
{
/* Determine if this is a primary key or not. */
iIsPrimaryKey = 0;
int16 *tpPKeysV = tpPKeys->values;
int tpPKeysSize = tpPKeys->dim1;
for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
iPrimaryKeyIndex++)
{
if (tpPKeysV[iPrimaryKeyIndex] == iColumnCounter)
{
iIsPrimaryKey = 1;
break;
}
}
if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
{
/**
* Don't use.
*/
#if defined DEBUG_OUTPUT
elog(NOTICE, "Skipping column");
#endif
continue;
}
} /* KeyUsage!=ALL */
#ifndef NODROPCOLUMN
// this comment is for 11
if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
//if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{
/**
* This column has been dropped.
* Do not mirror it.
*/
continue;
}
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
continue;
}
// this comment is for 11
cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
}
sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);
cpUnFormatedPtr = cpFieldData;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
if (cpFieldData != NULL)
{
*cpFormatedPtr = '\'';
iUsedDataBlock++;
cpFormatedPtr++;
}
else
{
sprintf(cpFormatedPtr," ");
iUsedDataBlock++;
cpFormatedPtr++;
continue;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldData: %s", cpFieldData);
elog(NOTICE, "Starting format loop");
#endif
while (*cpUnFormatedPtr != 0)
{
while (iDataBlockSize - iUsedDataBlock < 2)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'')
{
*cpFormatedPtr = '\\';
cpFormatedPtr++;
iUsedDataBlock++;
}
*cpFormatedPtr = *cpUnFormatedPtr;
cpFormatedPtr++;
cpUnFormatedPtr++;
iUsedDataBlock++;
}
pfree(cpFieldData);
while (iDataBlockSize - iUsedDataBlock < 3)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
sprintf(cpFormatedPtr, "' ");
iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif
} /* for iColumnCounter */
if (tpPKeys != NULL)
pfree(tpPKeys);
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning: DataBlockSize:%d iUsedDataBlock:%d",iDataBlockSize,
iUsedDataBlock);
#endif
memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);
return cpDataBlock;
}
bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
char *qb1;
char *qb2;
char *q;
char *fieldName;
int ret;
HeapTuple resTuple;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
char *value;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
elog(NOTICE, "fieldName=%s",fieldName);
#endif
qb1 = "SELECT ";
qb2 = " = any(attnames) from dbmirror_exclude_attributes where tblname=";
#if defined DEBUG_OUTPUT
elog(NOTICE, "%s q size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
q = palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
pfree(fieldName);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (value == NULL) return FALSE;
if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
pfree(value);
return TRUE;
}
else {
pfree(value);
return FALSE;
}
}
#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) {
HeapTuple tuple;
tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif
if (slaveid == 0 && op == 'm') {
elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
return -1;
}
/*
ISEXPUNCOND
*/
else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
/*
ISEXPWITHSLAVEID
*/
else if (slaveid > 0 && op =='d') {
int retval;
retval = storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
}
else if (slaveid > 0 && op == 'i') {
int retval;
retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return retval;
}
else if (slaveid > 0 && op == 'u') {
int retval;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
retval = handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
if (old_slaveid != -3 && old_slaveid != slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
}
else {
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
}
if (old_slaveid != -3) {
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
else if (slaveid == -3 && (op =='d' || op =='i')) {
return 0;
}
else if (slaveid == -3 && op =='u') {
int retval=0;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
/*
ISIMPLBACKWARD KALESMENO apo handleParents
*/
else if (slaveid >0 && op == 'm') {
if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei paidi ISIMPLBACKWARD
*/
/*
elog(ERROR,"Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen");
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
return 0;
/*
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
return 0;
}
if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
else {
int retval;
retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
}
}
/*
ISIMPL* but called from trigger ('i','d','u')
*/
/*
ISIMPLBACKWARD
*/
else if (slaveid == -1) {
if (op == 'i') return 0;
/*
Delete in this fashion may happen for "orphan" rows.
The deletions will be "triggered" by childern deletions/updates.
*/
else if (op == 'd') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval || deleteAccnt(cpTableName,pkxpress);
}
else if (op == 'u') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u', slaves=%d",(int)slaves);
#endif
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u', runslave=%d",*run);
#endif
retval = retval || handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval;
}
}
/*
ISIMPLFORWARD
*/
else if (slaveid == -2) {
if (op == 'i') {
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval || handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
return retval;
}
else if (op == 'd') {
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
}
return retval ;
}
else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
int old_slaveid = getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
//char origop = getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
//if (origop == 'i') {
// old_slaveid = forw_slaveid;
//}
//elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
// LEGACY CODE as of 2016-03-18
//retval = retval || handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
//retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
//retval = retval || updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
if (forw_slaveid>0) {
retval = handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
if (old_slaveid != -3 && old_slaveid != forw_slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != forw_slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
}
else {
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
if (old_slaveid != -3) {
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
}
else { // ISIMPLFORWARD me vslid IS NULL --> -3
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
}
return retval;
}
}
return 0;
}
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *qb;
char *q;
char *foo;
int ret;
HeapTuple resTuple;
int attnum;
int slaveid;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
/*
ISIMPLBACKWARD
*/
if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
/*
ISEXPUNCOND
*/
if (slave_siteidkeyname == NULL) return 0;
/*
ISIMPLFORWARD
*/
if (strncmp(slave_siteidkeyname,"___",3) == 0) {
pfree(slave_siteidkeyname);
return -2;
}
attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
elog(ERROR,"WRONG slaveidkeyname=%s given for table %s",slave_siteidkeyname,cpTableName);
pfree(slave_siteidkeyname);
return 0;
}
foo=SPI_getvalue(tuple,tupleDesc,attnum);
/*
ISEXPWITHSLAVEID BUT siteidkeyname is null
*/
if (foo == NULL) {
pfree(slave_siteidkeyname);
return -3;
}
pfree(slave_siteidkeyname);
slaveid = atoi(foo);
pfree(foo);
/*
ISEXPWITHSLAVEID
*/
return slaveid;
}
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s where %s='%s'",forw_table,forw_table_id,pathcolval);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
pfree(slave_siteidkeyname);
elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should have row with %s=%s",forw_table,forw_table_id,pathcolval);
return -2;
}
pfree(q);
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ret == -2) {
pfree(slave_siteidkeyname);
ret = getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
pfree(ParTableName);
return ret;
}
if (ret <= 0) {
pfree(slave_siteidkeyname);
pfree(ParTableName);
// elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should have slaveid>0",forw_table);
/***
apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID kai slaveid IS NULL
***/
return -3;
}
pfree(ParTableName);
pfree(slave_siteidkeyname);
return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai meta
me query na briskoume to tuple tou parent table.
*/
}
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
/*
consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
/* dirty patch code to handle copy supplycase. it was supposed to solve the problem of not turning 'u' to 'i' for e.g supplycasesquotes */
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues when changing vslid (null->int, int->null, intX->intY) */
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type ISIMPLFORWARD (-2) tables */
/*
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",cpTableName,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
*/
/**
pls consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai (eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, then update on the kid, and then perform a second UPDATE on the kid, this will result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i')
eite parent op != 'i' : (synepagetai :
eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou statement pio pano
eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
then we let the code fallback to return getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
pfree(slave_siteidkeyname);
pfree(pathcolval);
return -3;
}
pfree(slave_siteidkeyname);
pfree(pathcolval);
return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */
/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio xid kai op='d'
*/
}
/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
/*
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
pfree(q);
pfree(pathcolval);
elog(ERROR,"WRONG forward parent table %s has null origop in this transaction",forw_table);
}
else {
origop = foo[0];
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return origop;
}
return ' ';
}
*/
#define MAX_WHERE_CLAUSE 512
#define MAX_QUERY_LEN 512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char AthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char handleit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
int DIM_0 = ARR_DIMS(arr)[0];
int LBOUND_0 = ARR_LBOUND(arr)[0];
// resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
// confrelname = (char *) DatumGetName(resDatum);
// elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
}
memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Btuple != NULL) {
char *Bvalue = SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s,Before value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
if (Bvalue == NULL) handleit=1;
else if (strcmp(Bvalue,value)) {
handleit=1;
pfree(Bvalue);
}
}
else handleit=1;
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
elog(NOTICE,"handle it = %d, ",handleit);
elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !handleit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
There must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
else
sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: par table %s should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
else return 1;
*/
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
else return 1;
}
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
bool isNull;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}
int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
public.dbmirror_accounting.cnt DEFAULT = 1
*/
sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
else return 0;
}
char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) {
int2vector *pk;
int i;
char *WHERE = palloc(MAX_WHERE_CLAUSE);
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif
pk = getPrimaryKey(tableOid);
if (pk == NULL) return NULL;
int16 *pkV = pk->values;
int pkSize = pk->dim1;
if (pkSize>0) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[0]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[0],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[0],cpTableName);
return NULL;
}
sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
pfree(keyname);
pfree(keyval);
}
for (i=1;i<pkSize;i++) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[i]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[i],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[i],cpTableName);
return NULL;
}
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
pfree(keyname);
pfree(keyval);
}
pfree(pk);
return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
char *q;
int ret;
int SLAVE_processed;
SPITupleTable *SLAVE_tupTable;
int *slaves;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaves of Table=%s, pkxpress=%s",cpTableName,pkxpress);
#endif
if (pkxpress == NULL) return NULL;
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
SLAVE_tupTable = SPI_tuptable;
SLAVE_processed = SPI_processed;
slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
for (ret=0;ret<SLAVE_processed;ret++) {
char *slaveidStr = SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
*(slaves+ret) = atoi(slaveidStr);
pfree(slaveidStr);
}
*(slaves+SLAVE_processed) = 0;
return slaves;
}
int deleteAccnt (char *cpTableName, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char BthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
int ParSlaveId;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char decreaseit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
/*
Nothing is done regarding "Before" status of parent since there is not one
*/
}
memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Atuple != NULL) {
char *Avalue = SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s,After value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
if (Avalue == NULL) decreaseit=1;
else if (strcmp(Avalue,value)) {
decreaseit=1;
pfree(Avalue);
}
}
else decreaseit=1; /* is delete operation*/
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
elog(NOTICE,"decrease it = %d, ",decreaseit);
elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !decreaseit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
there must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
else
sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: updateAcctParents: par table %s should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ParSlaveId = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
pfree(ParTableName);
continue;
}
if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
ret = updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);
}
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
Attachments:
[text/plain] pending.c.orig (60.8K, 3-pending.c.orig)
download | inline:
/****************************************************************************
* pending.c
* $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
*
* This file contains a trigger for Postgresql-7.x to record changes to tables
* to a pending table for mirroring.
* All tables that should be mirrored should have this trigger hooked up to it.
*
* Written by Steven Singer ([email protected])
* (c) 2001-2002 Navtech Systems Support Inc.
* ALL RIGHTS RESERVED
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*
***************************************************************************/
#include <string.h>
#include "postgres.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define Int2VectorSize(n) (offsetof(int2vector, values) + (n) * sizeof(int16))
#define TRUE 1
#define FALSE 0
PG_MODULE_MAGIC;
enum FieldUsage
{
PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};
int storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp,int slaveid, char origOp);
int storexid(void);
int handler(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp, int slaveid, char *pkxpress);
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);*/
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid);
int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData);
int2vector *getPrimaryKey(Oid tblOid);
char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
enum FieldUsage eKeyUsage);
bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);
char *get_namespace_name(Oid nspid);
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
/*****************************************************************************
* The entry point for the trigger function.
* The Trigger takes a single SQL 'text' argument indicating the name of the
* table the trigger was applied to. If this name is incorrect so will the
* mirroring.
****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
TriggerData *trigdata;
TupleDesc tupdesc;
HeapTuple beforeTuple = NULL;
HeapTuple afterTuple = NULL;
HeapTuple retTuple = NULL;
char *tblname;
char op = 0;
char *schemaname;
char *fullyqualtblname;
char *pkxpress;
if (fcinfo->context != NULL)
{
if (SPI_connect() < 0)
{
elog(NOTICE, "recordchange could not connect to SPI");
return -1;
}
trigdata = (TriggerData *) fcinfo->context;
/* Extract the table name */
tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
fullyqualtblname = palloc(strlen(tblname) +
strlen(schemaname) + 6);
sprintf(fullyqualtblname,"\"%s\".\"%s\"",
schemaname,tblname);
#else
fullyqualtblname = palloc(strlen(tblname) + 3);
sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
tupdesc = trigdata->tg_relation->rd_att;
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
retTuple = trigdata->tg_newtuple;
beforeTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_newtuple;
op = 'u';
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_trigtuple;
op = 'i';
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
beforeTuple = trigdata->tg_trigtuple;
op = 'd';
}
if (storexid()) {
elog(ERROR, "Operation could not be mirrored. storexid problem");
return PointerGetDatum(NULL);
}
pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
{
/* An error occoured. Skip the operation. */
elog(ERROR, "Operation could not be mirrored");
return PointerGetDatum(NULL);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning on success");
#endif
pfree(fullyqualtblname);
pfree(pkxpress);
SPI_finish();
return PointerGetDatum(retTuple);
}
else
{
/*
* Not being called as a trigger.
*/
return PointerGetDatum(NULL);
}
}
/*****************************************************************************
* stores the current xid in dbmirror_xactions
*****************************************************************************/
int
storexid(void) {
//char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES ($1)";
char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT $1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";
int iResult = 0;
Datum saPlanData[1];
Oid taPlanArgTypes[1] = {INT4OID};
void *vpPlan;
vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, " storexid Error creating plan");
saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());
iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
if (iResult < 0) {
elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, iResult);
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif
return 0;
}
/*****************************************************************************
* Constructs and executes an SQL query to write a record of this tuple change
* to the pending table.
*****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupDesc,
Oid tableOid,
char cOp, int slaveid, char origOp)
{
char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";
int iResult = 0;
HeapTuple tCurTuple;
char nulls[5]=" ";
//Points the current tuple(before or after)
Datum saPlanData[5];
Oid taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, INT4OID, CHAROID};
void *vpPlan;
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, "Error creating plan");
/* SPI_saveplan(vpPlan); */
saPlanData[0] = PointerGetDatum(cpTableName);
saPlanData[1] = CharGetDatum(cOp);
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
saPlanData[3] = Int32GetDatum(slaveid);
if (slaveid <=0) nulls[3]='n';
saPlanData[4] = CharGetDatum(origOp);
iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
if (iResult < 0)
elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in pending table");
#endif
if (cOp == 'd')
{
/**
* This is a record of a delete operation.
* Just store the key data.
*/
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
}
else if (cOp == 'i')
{
/**
* An Insert operation.
* Store all data
*/
iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE);
}
else
{
/* op must be an update. */
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid);
iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Done storing keyinfo/data");
#endif
return iResult;
}
int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
TupleDesc tTupleDesc, Oid tableOid)
{
Oid saPlanArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
void *pplan;
Datum saPlanData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetCode;
pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, PRIMARY);
if (cpKeyData == NULL)
{
elog(ERROR,"Could not determine primary key data");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
saPlanData[0] = PointerGetDatum(cpKeyData_tmp);
iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
if (cpKeyData != NULL)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetCode != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeKeyInfo");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
int2vector *
getPrimaryKey(Oid tblOid)
{
char *queryBase;
char *query;
bool isNull;
int2vector *resultKey;
int2vector *tpResultKey;
HeapTuple resTuple;
Datum resDatum;
int ret;
queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
sprintf(query, "%s%d", queryBase, tblOid);
ret = SPI_exec(query, 1);
if (ret != SPI_OK_SELECT || SPI_processed != 1)
{
elog(NOTICE, "Could not select primary index key");
return NULL;
}
resTuple = SPI_tuptable->vals[0];
resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);
if (isNull) {
elog(NOTICE, "PKey is NULL");
return NULL;
}
tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
int n=tpResultKey->dim1;
resultKey = palloc(Int2VectorSize(n));
if (n > 0)
memcpy(resultKey->values, tpResultKey->values, n * sizeof(int16));
SET_VARSIZE(resultKey, Int2VectorSize(n));
resultKey->ndim = 1;
resultKey->dataoffset = 0;
resultKey->elemtype = INT2OID;
resultKey->dim1 = n;
resultKey->lbound1 = 0;
pfree(query);
return resultKey;
}
/******************************************************************************
* Stores a copy of the non-key data for the row.
*****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{
Oid planArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
SPIPlanPtr pplan;
Datum planData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetValue;
pplan = SPI_prepare(insQuery, 1, planArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
if (iIncludeKeyData == 0)
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, tableOid, NONPRIMARY);
else
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, ALL);
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
planData[0] = PointerGetDatum(cpKeyData_tmp);
iRetValue = SPI_execp(pplan, planData, NULL, 1);
if (cpKeyData != 0)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetValue != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeData");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
/**
* Packages the data in tTupleData into a string of the format
* FieldName='value text' where any quotes inside of value text
* are escaped with a backslash and any backslashes in value text
* are esacped by a second back slash.
*
* tTupleDesc should be a description of the tuple stored in
* tTupleData.
*
* eFieldUsage specifies which fields to use.
* PRIMARY implies include only primary key fields.
* NONPRIMARY implies include only non-primary key fields.
* ALL implies include all fields.
*/
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
enum FieldUsage eKeyUsage)
{
int iNumCols;
int2vector *tpPKeys = NULL;
int iColumnCounter;
char *cpDataBlock;
int iDataBlockSize;
int iUsedDataBlock;
iNumCols = tTupleDesc->natts;
if (eKeyUsage != ALL)
{
tpPKeys = getPrimaryKey(tableOid);
if (tpPKeys == NULL)
return NULL;
}
#if defined DEBUG_OUTPUT
if (tpPKeys != NULL)
elog(NOTICE, "Have primary keys");
#endif
cpDataBlock = palloc(BUFFER_SIZE);
iDataBlockSize = BUFFER_SIZE;
iUsedDataBlock = 0; /* To account for the null */
for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
{
int iIsPrimaryKey;
int iPrimaryKeyIndex;
char *cpUnFormatedPtr;
char *cpFormatedPtr;
char *cpFieldName;
char *cpFieldData;
if (eKeyUsage != ALL)
{
/* Determine if this is a primary key or not. */
iIsPrimaryKey = 0;
int16 *tpPKeysV = tpPKeys->values;
int tpPKeysSize = tpPKeys->dim1;
for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
iPrimaryKeyIndex++)
{
if (tpPKeysV[iPrimaryKeyIndex] == iColumnCounter)
{
iIsPrimaryKey = 1;
break;
}
}
if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
{
/**
* Don't use.
*/
#if defined DEBUG_OUTPUT
elog(NOTICE, "Skipping column");
#endif
continue;
}
} /* KeyUsage!=ALL */
#ifndef NODROPCOLUMN
// this comment is for 11
if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
//if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{
/**
* This column has been dropped.
* Do not mirror it.
*/
continue;
}
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
continue;
}
// this comment is for 11
cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
}
sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);
cpUnFormatedPtr = cpFieldData;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
if (cpFieldData != NULL)
{
*cpFormatedPtr = '\'';
iUsedDataBlock++;
cpFormatedPtr++;
}
else
{
sprintf(cpFormatedPtr," ");
iUsedDataBlock++;
cpFormatedPtr++;
continue;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldData: %s", cpFieldData);
elog(NOTICE, "Starting format loop");
#endif
while (*cpUnFormatedPtr != 0)
{
while (iDataBlockSize - iUsedDataBlock < 2)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'')
{
*cpFormatedPtr = '\\';
cpFormatedPtr++;
iUsedDataBlock++;
}
*cpFormatedPtr = *cpUnFormatedPtr;
cpFormatedPtr++;
cpUnFormatedPtr++;
iUsedDataBlock++;
}
pfree(cpFieldData);
while (iDataBlockSize - iUsedDataBlock < 3)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
sprintf(cpFormatedPtr, "' ");
iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif
} /* for iColumnCounter */
if (tpPKeys != NULL)
pfree(tpPKeys);
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning: DataBlockSize:%d iUsedDataBlock:%d",iDataBlockSize,
iUsedDataBlock);
#endif
memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);
return cpDataBlock;
}
bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
char *qb1;
char *qb2;
char *q;
char *fieldName;
int ret;
HeapTuple resTuple;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
char *value;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
elog(NOTICE, "fieldName=%s",fieldName);
#endif
qb1 = "SELECT ";
qb2 = " = any(attnames) from dbmirror_exclude_attributes where tblname=";
#if defined DEBUG_OUTPUT
elog(NOTICE, "%s q size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
q = palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
pfree(fieldName);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (value == NULL) return FALSE;
if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
pfree(value);
return TRUE;
}
else {
pfree(value);
return FALSE;
}
}
#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) {
HeapTuple tuple;
tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif
if (slaveid == 0 && op == 'm') {
elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
return -1;
}
/*
ISEXPUNCOND
*/
else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
/*
ISEXPWITHSLAVEID
*/
else if (slaveid > 0 && op =='d') {
int retval;
retval = storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
}
else if (slaveid > 0 && op == 'i') {
int retval;
retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return retval;
}
else if (slaveid > 0 && op == 'u') {
int retval;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
retval = handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
if (old_slaveid != -3 && old_slaveid != slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
}
else {
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
}
if (old_slaveid != -3) {
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
else if (slaveid == -3 && (op =='d' || op =='i')) {
return 0;
}
else if (slaveid == -3 && op =='u') {
int retval=0;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
/*
ISIMPLBACKWARD KALESMENO apo handleParents
*/
else if (slaveid >0 && op == 'm') {
if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei paidi ISIMPLBACKWARD
*/
/*
elog(ERROR,"Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen");
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
return 0;
/*
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
return 0;
}
if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
else {
int retval;
retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
}
}
/*
ISIMPL* but called from trigger ('i','d','u')
*/
/*
ISIMPLBACKWARD
*/
else if (slaveid == -1) {
if (op == 'i') return 0;
/*
Delete in this fashion may happen for "orphan" rows.
The deletions will be "triggered" by childern deletions/updates.
*/
else if (op == 'd') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval || deleteAccnt(cpTableName,pkxpress);
}
else if (op == 'u') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u', slaves=%d",(int)slaves);
#endif
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u', runslave=%d",*run);
#endif
retval = retval || handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval;
}
}
/*
ISIMPLFORWARD
*/
else if (slaveid == -2) {
if (op == 'i') {
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval || handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
return retval;
}
else if (op == 'd') {
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
}
return retval ;
}
else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
int retval=0;
int forw_slaveid = getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
int old_slaveid = getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
//char origop = getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
//if (origop == 'i') {
// old_slaveid = forw_slaveid;
//}
//elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
// LEGACY CODE as of 2016-03-18
//retval = retval || handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
//retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
//retval = retval || updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
if (forw_slaveid>0) {
retval = handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
if (old_slaveid != -3 && old_slaveid != forw_slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != forw_slaveid) {
retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
}
else {
retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
if (old_slaveid != -3) {
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
}
else { // ISIMPLFORWARD me vslid IS NULL --> -3
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
}
return retval;
}
}
return 0;
}
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *qb;
char *q;
char *foo;
int ret;
HeapTuple resTuple;
int attnum;
int slaveid;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
/*
ISIMPLBACKWARD
*/
if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
/*
ISEXPUNCOND
*/
if (slave_siteidkeyname == NULL) return 0;
/*
ISIMPLFORWARD
*/
if (strncmp(slave_siteidkeyname,"___",3) == 0) {
pfree(slave_siteidkeyname);
return -2;
}
attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
elog(ERROR,"WRONG slaveidkeyname=%s given for table %s",slave_siteidkeyname,cpTableName);
pfree(slave_siteidkeyname);
return 0;
}
foo=SPI_getvalue(tuple,tupleDesc,attnum);
/*
ISEXPWITHSLAVEID BUT siteidkeyname is null
*/
if (foo == NULL) {
pfree(slave_siteidkeyname);
return -3;
}
pfree(slave_siteidkeyname);
slaveid = atoi(foo);
pfree(foo);
/*
ISEXPWITHSLAVEID
*/
return slaveid;
}
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s where %s='%s'",forw_table,forw_table_id,pathcolval);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
pfree(slave_siteidkeyname);
elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should have row with %s=%s",forw_table,forw_table_id,pathcolval);
return -2;
}
pfree(q);
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ret == -2) {
pfree(slave_siteidkeyname);
ret = getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
pfree(ParTableName);
return ret;
}
if (ret <= 0) {
pfree(slave_siteidkeyname);
pfree(ParTableName);
// elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should have slaveid>0",forw_table);
/***
apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID kai slaveid IS NULL
***/
return -3;
}
pfree(ParTableName);
pfree(slave_siteidkeyname);
return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai meta
me query na briskoume to tuple tou parent table.
*/
}
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
/*
consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
/* dirty patch code to handle copy supplycase. it was supposed to solve the problem of not turning 'u' to 'i' for e.g supplycasesquotes */
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues when changing vslid (null->int, int->null, intX->intY) */
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type ISIMPLFORWARD (-2) tables */
/*
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",cpTableName,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
*/
/**
pls consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai (eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, then update on the kid, and then perform a second UPDATE on the kid, this will result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i')
eite parent op != 'i' : (synepagetai :
eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou statement pio pano
eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
then we let the code fallback to return getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
pfree(slave_siteidkeyname);
pfree(pathcolval);
return -3;
}
pfree(slave_siteidkeyname);
pfree(pathcolval);
return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */
/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio xid kai op='d'
*/
}
/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
/*
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
pfree(q);
pfree(pathcolval);
elog(ERROR,"WRONG forward parent table %s has null origop in this transaction",forw_table);
}
else {
origop = foo[0];
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return origop;
}
return ' ';
}
*/
#define MAX_WHERE_CLAUSE 512
#define MAX_QUERY_LEN 512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char AthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char handleit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
int DIM_0 = ARR_DIMS(arr)[0];
int LBOUND_0 = ARR_LBOUND(arr)[0];
// resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
// confrelname = (char *) DatumGetName(resDatum);
// elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
}
memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Btuple != NULL) {
char *Bvalue = SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s,Before value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
if (Bvalue == NULL) handleit=1;
else if (strcmp(Bvalue,value)) {
handleit=1;
pfree(Bvalue);
}
}
else handleit=1;
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
elog(NOTICE,"handle it = %d, ",handleit);
elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !handleit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
There must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
else
sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: par table %s should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
else return 1;
*/
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
else return 1;
}
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
bool isNull;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress);
return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}
int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
public.dbmirror_accounting.cnt DEFAULT = 1
*/
sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
else return 0;
}
char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) {
int2vector *pk;
int i;
char *WHERE = palloc(MAX_WHERE_CLAUSE);
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif
pk = getPrimaryKey(tableOid);
if (pk == NULL) return NULL;
int16 *pkV = pk->values;
int pkSize = pk->dim1;
if (pkSize>0) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[0]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[0],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[0],cpTableName);
return NULL;
}
sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
pfree(keyname);
pfree(keyval);
}
for (i=1;i<pkSize;i++) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[i]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[i],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[i],cpTableName);
return NULL;
}
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
pfree(keyname);
pfree(keyval);
}
pfree(pk);
return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
char *q;
int ret;
int SLAVE_processed;
SPITupleTable *SLAVE_tupTable;
int *slaves;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaves of Table=%s, pkxpress=%s",cpTableName,pkxpress);
#endif
if (pkxpress == NULL) return NULL;
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
SLAVE_tupTable = SPI_tuptable;
SLAVE_processed = SPI_processed;
slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
for (ret=0;ret<SLAVE_processed;ret++) {
char *slaveidStr = SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
*(slaves+ret) = atoi(slaveidStr);
pfree(slaveidStr);
}
*(slaves+SLAVE_processed) = 0;
return slaves;
}
int deleteAccnt (char *cpTableName, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char BthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
int ParSlaveId;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char decreaseit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
/*
Nothing is done regarding "Before" status of parent since there is not one
*/
}
memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Atuple != NULL) {
char *Avalue = SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s,After value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
if (Avalue == NULL) decreaseit=1;
else if (strcmp(Avalue,value)) {
decreaseit=1;
pfree(Avalue);
}
}
else decreaseit=1; /* is delete operation*/
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
elog(NOTICE,"decrease it = %d, ",decreaseit);
elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !decreaseit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
there must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
else
sprintf(WHERE,"%s AND \"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: updateAcctParents: par table %s should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ParSlaveId = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
pfree(ParTableName);
continue;
}
if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
ret = updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);
}
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
[text/x-log] make.log (9.0K, 4-make.log)
download | inline:
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wcast-function-type -Wshadow=compatible-local -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -fPIC -DPIC -fvisibility=hidden -I. -I./ -I/usr/local/pgsql/include/server -I/usr/local/pgsql/include/internal -I/usr/local/include -c -o pending.o pending.c
pending.c: In function 'storePending':
pending.c:249:25: warning: variable 'tCurTuple' set but not used [-Wunused-but-set-variable]
249 | HeapTuple tCurTuple;
| ^~~~~~~~~
In file included from /usr/local/pgsql/include/server/nodes/execnodes.h:34,
from /usr/local/pgsql/include/server/commands/trigger.h:18,
from /usr/local/pgsql/include/server/executor/spi.h:16,
from pending.c:36:
pending.c: In function 'getPrimaryKey':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:402:54: note: in expansion of macro 'PG_DETOAST_DATUM'
402 | tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
In file included from pending.c:34:
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:403:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
403 | int n=tpResultKey->dim1;
| ^~~
pending.c: In function 'packageData':
pending.c:527:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
527 | int16 *tpPKeysV = tpPKeys->values;
| ^~~~~
pending.c:551:29: error: 'struct TupleDescData' has no member named 'attrs'
551 | if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
| ^~
pending.c:571:71: error: 'struct TupleDescData' has no member named 'attrs'
571 | cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
| ^~
pending.c: In function 'isExcluded':
pending.c:664:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
664 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'handler':
pending.c:729:21: warning: variable 'retval' set but not used [-Wunused-but-set-variable]
729 | int retval;
| ^~~~~~
pending.c: In function 'getSlaveId':
pending.c:928:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
928 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'getComputedSlaveId':
pending.c:993:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
993 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'getOldComputedSlaveId':
pending.c:1119:7: warning: unused variable 'ParTableName' [-Wunused-variable]
1119 | char *ParTableName;
| ^~~~~~~~~~~~
pending.c:1116:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
1116 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'handleParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1444:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1444 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1472:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1472 | char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
| ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1509:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1509 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1513:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
1513 | int PAR_processed;
| ^~~~~~~~~~~~~
pending.c:1434:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
1434 | int thisatt_processed;
| ^~~~~~~~~~~~~~~~~
pending.c:1430:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
1430 | int RealPar_processed;
| ^~~~~~~~~~~~~~~~~
pending.c: In function 'getPKxpress':
pending.c:1651:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1651 | int16 *pkV = pk->values;
| ^~~~~
pending.c: In function 'updateAccntParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1825:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1825 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1846:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1846 | char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
| ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1891:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1891 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1895:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
1895 | int PAR_processed;
| ^~~~~~~~~~~~~
pending.c:1814:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
1814 | int thisatt_processed;
| ^~~~~~~~~~~~~~~~~
pending.c:1810:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
1810 | int RealPar_processed;
| ^~~~~~~~~~~~~~~~~
gmake: *** [<builtin>: pending.o] Error 1
[text/x-patch] pending_changes_against_18beta1.patch (3.8K, 5-pending_changes_against_18beta1.patch)
download | inline diff:
diff --git a/code/dbmirror/pending.c b/code/dbmirror/pending.c
index fca76318..b1c61e02 100644
--- a/code/dbmirror/pending.c
+++ b/code/dbmirror/pending.c
@@ -98,7 +98,7 @@ char *get_namespace_name(Oid nspid);
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
-//#define DEBUG_OUTPUT 1
+#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
@@ -399,7 +399,8 @@ getPrimaryKey(Oid tblOid)
return NULL;
}
- tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ tpResultKey = (int2vector *) PG_DETOAST_DATUM(resDatum);
int n=tpResultKey->dim1;
resultKey = palloc(Int2VectorSize(n));
if (n > 0)
@@ -548,7 +549,8 @@ packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid ta
} /* KeyUsage!=ALL */
#ifndef NODROPCOLUMN
// this comment is for 11
- if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+ ////if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+ if (TupleDescAttr(tTupleDesc,iColumnCounter-1)->attisdropped)
//if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{
/**
@@ -568,7 +570,8 @@ eksairetea columns
}
// this comment is for 11
- cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+ ////cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+ cpFieldName = NameStr(TupleDescAttr(tTupleDesc,iColumnCounter - 1)->attname );
//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldName: %s", cpFieldName);
@@ -1441,7 +1444,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
@@ -1506,7 +1510,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
#endif
if (FkHasNullValueORXcluded || !handleit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
@@ -1822,7 +1827,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
@@ -1888,7 +1894,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
^ permalink raw reply [nested|flat] 2+ messages in thread
* Re: Postgresql 18beta1 and SPI changes
2025-05-12 12:50 Re: Postgresql 18beta1 and SPI changes Achilleas Mantzios <[email protected]>
@ 2025-05-13 17:12 ` Achilleas Mantzios <[email protected]>
0 siblings, 0 replies; 2+ messages in thread
From: Achilleas Mantzios @ 2025-05-13 17:12 UTC (permalink / raw)
To: Tom Lane <[email protected]>; +Cc: [email protected] <[email protected]>; [email protected]
On 12/5/25 15:50, Achilleas Mantzios wrote:
>
> Dear All, Dear Tom
>
> On 5/11/25 16:20, Tom Lane wrote:
>
>> Achilleas Mantzios<[email protected]> writes:
>>> We use are own version of DBmirror, we run our replication in a highly
>>> fine grained manner. So every upgrade I have to make the code compile
>>> and test. Up to PostgreSQL 17, I only got minor compilation problems
>>> that I managed to resolve fairly easily. However this didn't prove to be
>>> the case with PostgreSQL 18beta1, it proved harder to compile and as my
>>> fears were verified, it has serious problems.
>>> My question : is 18's SPI stabilized ? Can I start work on our version
>>> of DBmirror ? Or wait for 18beta2 or -RC ?
>> If you think there are changes we need to make, you'd better get
>> specific sooner not later. I'm not aware of any large fixes that
>> are pending, cf
>>
>> https://wiki.postgresql.org/wiki/PostgreSQL_18_Open_Items
>
> I attach
>
> a) our old source (pending.c.orig), as of PostgreSQL 17 (tested for
> some 7 months, so pretty well tested),
>
> b) the compilation errors when compiled against 18beta1, and
>
> c) the patch that I came up with, which seems (in my minimal testing)
> to yield correct results on 18beta1.
>
> The majority of serious warnings have to do with de-toasting arrays
> and the PK's int2vector , while the error has to do with getting
> column details such as attisdropped and attname.
>
> Please have a look, and share your thoughts. I haven't touched serious
> C coding till I first wrote the above sometime in 2004 with a bunch of
> additions some years ago.
>
Hi again
just to close this, it seems that the main issue for the compilation
fail was a change introduced in this commit :
d28dff3f6cd6a7562fb2c211ac0fb74a33ffd032 and had to do with access to
TupleDesc->attrs which does not exist anymore. Some browsing in the
contrib dir provided the hint I needed (to use : TupleDescAttr ).
The funny thing here is that deepseek spotted the issue before I even
pasted any piece of code, just mentioning. It would be nice if those SPI
level changes were mentioned somewhere.
>
>> regards, tom lane
^ permalink raw reply [nested|flat] 2+ messages in thread
end of thread, other threads:[~2025-05-13 17:12 UTC | newest]
Thread overview: 2+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2025-05-12 12:50 Re: Postgresql 18beta1 and SPI changes Achilleas Mantzios <[email protected]>
2025-05-13 17:12 ` Achilleas Mantzios <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox