public inbox for [email protected]
help / color / mirror / Atom feedFrom: Chao Li <[email protected]>
To: Amit Kapila <[email protected]>
Cc: Euler Taveira <[email protected]>
Cc: GRANT ZHOU <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Dilip Kumar <[email protected]>
Cc: Postgres hackers <[email protected]>
Subject: Re: Improve logical replication usability when tables lack primary keys
Date: Wed, 4 Feb 2026 15:21:32 +0800
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <CAEoWx2mMorbMwjKbT4YCsjDyL3r9Mp+z0bbK57VZ+OkJTgJQVQ@mail.gmail.com>
<CAA4eK1+UL6wVDNzkpHjA7RVLD_8AkrP2tu+RvQ2h5AUjyEe+-Q@mail.gmail.com>
<[email protected]>
<CAFiTN-ucvk8JOiLvjii6VXar6nYJvCQDgzp8_4v55yweUmzdzw@mail.gmail.com>
<[email protected]>
<CAA4eK1KzjxO-qWjWSox6e6AWH4FVU5ZPEgeZ+na=eyov7umutg@mail.gmail.com>
<[email protected]>
<TY4PR01MB16907E1C68EA6EBADCE54ABBE94ABA@TY4PR01MB16907.jpnprd01.prod.outlook.com>
<[email protected]>
<CA+FXcm-YZXJpc6E7XEDTv9Yaic=U7Dwnjj4znxJ4gCxUZMcXww@mail.gmail.com>
<[email protected]>
<[email protected]>
<CAA4eK1KMOTYGSQdoPBZsJ6T-QEw5eYHG0padQXTViu7LeeFWMw@mail.gmail.com>
<[email protected]>
> On Dec 30, 2025, at 16:07, Chao Li <[email protected]> wrote:
>
>
>
>> On Dec 22, 2025, at 19:48, Amit Kapila <[email protected]> wrote:
>>
>> On Fri, Dec 19, 2025 at 1:39 PM Chao Li <[email protected]> wrote:
>>>
>>>> On Dec 18, 2025, at 22:49, Euler Taveira <[email protected]> wrote:
>>>>
>>>> On Wed, Dec 17, 2025, at 6:43 PM, GRANT ZHOU wrote:
>>>>> On Wed, Dec 17, 2025 at 12:50 PM Euler Taveira <[email protected]> wrote:
>>>>>> Each table needs to say what's its row identifier. The user created a table
>>>>>> without primary key. Well, create a primary key. There are dozens of thousands
>>>>>> of objects. Use a script.
>>>>> However, I’d like to share a user perspective regarding the "use a
>>>>> script" approach. The main value of `FOR TABLES IN SCHEMA` is
>>>>> *in-database automation*. If users still need to maintain external
>>>>> scripts to monitor and `ALTER` new tables to prevent replication
>>>>> errors, it significantly diminishes the value of that automation.
>>>>>
>>>>
>>>> As I tried to explain in the previous email, the problem with FOR ALL TABLES
>>>> and FOR TABLES IN SCHEMA syntax is that the is no catalog information about the
>>>> relations; the list of relations is collected at runtime.
>>>>
>>>> When I suggested "use a script" I was referring to fix the logical replication
>>>> setup regarding the lack of primary key. There is no need to have an automation
>>>> outside the database, use an event trigger. If your lazy user doesn't create
>>>> the primary key, assign REPLICA IDENTITY FULL. Something like
>>>>
>>>> -- This example is far from being a complete solution for fixing the lack of
>>>> -- primary key in a logical replication scenario.
>>>> -- ALTER TABLE should be supported too
>>>> CREATE OR REPLACE FUNCTION event_trigger_for_replica_identity()
>>>> RETURNS event_trigger LANGUAGE plpgsql AS $$
>>>> DECLARE
>>>> obj record;
>>>> rec record;
>>>> ricnt integer := 0;
>>>> BEGIN
>>>> FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
>>>> LOOP
>>>> IF obj.command_tag = 'CREATE TABLE' THEN
>>>> SELECT COUNT(*) INTO ricnt FROM pg_index WHERE indrelid = obj.objid AND indisprimary;
>>>> RAISE NOTICE 'ricnt: %', ricnt;
>>>> IF ricnt = 0 THEN
>>>> EXECUTE 'ALTER TABLE ' || obj.object_identity || ' REPLICA IDENTITY FULL';
>>>> END IF;
>>>> END IF;
>>>> END LOOP;
>>>> END;
>>>> $$;
>>>>
>>>> CREATE EVENT TRIGGER event_trigger_for_replica_identity
>>>> ON ddl_command_end
>>>> EXECUTE FUNCTION event_trigger_for_replica_identity();
>>>>
>>>> CREATE TABLE event_trigger_test_1 (a int);
>>>> \d+ event_trigger_test_1
>>>> CREATE TABLE event_trigger_test_2 (a int primary key);
>>>> \d+ event_trigger_test_2
>>>> CREATE TABLE event_trigger_test_3 (a int, b text not null, primary key(b));
>>>> \d+ event_trigger_test_3
>>>> --ALTER TABLE event_trigger_test_3 DROP CONSTRAINT event_trigger_test_3_pkey;
>>>> --\d+ event_trigger_test_3
>>>>
>>>> DROP EVENT TRIGGER event_trigger_for_replica_identity;
>>>> DROP FUNCTION event_trigger_for_replica_identity;
>>>> DROP TABLE event_trigger_test_1, event_trigger_test_2, event_trigger_test_3;
>>>>
>>>> 8<----------------------------------------------------------------------------8<
>>>>
>>>>> Additionally, tables without Primary Keys are valid SQL and extremely
>>>>> common in enterprise environments (e.g., audit logs, data warehousing).
>>>>> In large-scale deployments, enforcing PKs on every single table isn't
>>>>> always practical.
>>>>>
>>>>
>>>> I'm not saying users shouldn't create tables without a primary key. I'm arguing
>>>> that this decision should take into account what adjustments need to be made to
>>>> use these tables in logical replication.
>>>>
>>>>>
>>>>> I think the goal of this proposal is not to change the underlying table
>>>>> property design, but rather to seek a mechanism (like a Publication
>>>>> option) to ensure this automation functions safely without external
>>>>> intervention. It is simply about allowing the database to handle these
>>>>> valid, common scenarios gracefully when automation is enabled.
>>>>>
>>>>
>>>> You didn't get it. You already have one property to handle it and you are
>>>> proposing to add a second property to handle it.
>>>>
>>>> I think you are pursuing the wrong solution. IMO we need a solution to enforce
>>>> that the logical replication contract is valid. If you create or modify a table
>>>> that is part of a publication, there is no validation that that table complies
>>>> with the publication properties (update and delete properties should require an
>>>> appropriate replica identity). We should close the gaps in both publication and
>>>> table.
>>>>
>>
>> If we want, we can ensure that any table added to that specific
>> publication (that has an option replica_identy='full') would
>> automatically override the default to FULL, if PK is not available.
>> This information can be cached to avoid overhead.
>>
>>>
>>> If I summarize Euler’s position in short words: discipline over convenience. I actually strongly agree with that. In PG we generally prefer explicit over implicit behavior, and predictability over magic.
>>>
>>
>> You haven't told why we can't consider a custom event trigger as
>> suggested by Euler for customers who are not willing to change the RI
>> default explicitly for each table. I think it is worth considering
>> providing a custom solution outside core-postgres for your customers
>> for this specific case.
>
> Thanks for raising this. Let me clarify why we don’t consider a custom event trigger a satisfactory solution in practice, even though it is technically possible.
>
> I discussed this with our field teams, and some customers have indeed experimented with event-trigger-based solutions before. However, they generally don’t prefer them for this use case.
>
> First, the required logic is non-trivial and fragile. The trigger would need to track table creation, primary key creation and removal, and distinguish between cases where REPLICA IDENTITY FULL was set implicitly versus explicitly by the user. Handling all these cases correctly makes the solution feel like a workaround rather than a robust enforcement mechanism.
>
> Second, event triggers introduce operational risk. They need to be installed, monitored, and maintained separately from the core system. If a trigger is accidentally dropped, disabled, or modified, the behavior silently changes, which is particularly risky for replication semantics.
>
> Third, customers place much higher trust in core PostgreSQL behavior than in custom scripts layered on top. Issues caused by core behavior are seen as something that can be understood, worked around, or fixed by upgrading, whereas failures caused by custom triggers are harder to diagnose and are often attributed to the overall solution quality.
>
> For these reasons, while event triggers can work as a stopgap, our customers strongly prefer a solution where the replication contract is enforced by core PostgreSQL rather than external mechanisms.
>
>>
>>> Based on the discussion so far, I think we share the following design goals:
>>>
>>> 1) Keep replica identity as a table property.
>>> 2) Avoid silent runtime failures when FOR TABLES IN SCHEMA pulls in tables without primary keys.
>>> 3) Avoid global or implicit behavior changes.
>>> 4) Preserve explicit opt-in for higher WAL cost.
>>> 5) Keep the logical replication contract explicit and enforceable.
>>>
>>> I’ve been thinking about whether adding a new replica identity could meet these goals.
>>>
>>> Today we have four replica identities: DEFAULT (PK, fallback to NONE), INDEX, FULL, and NONE.
>>>
>>> What if we introduce a new replica identity, tentatively called “FORCE”: PK with fallback to FULL. (Let’s keep our focus on the design, not argue the name for now.)
>>>
>>> With this approach:
>>>
>>> 1) Replica identity remains a table property.
>>> 2) Publication membership is still evaluated at runtime, so FOR TABLES IN SCHEMA is not special-cased.
>>> 3) No new GUCs are required.
>>> 4) The user must explicitly opt in by setting the replica identity. Once FORCE is chosen, adding or dropping a primary key later does not silently break UPDATE/DELETE replication.
>>>
>>> 5) The logical replication contract remains explicit; the table declares that it is safe for UPDATE/DELETE replication even without a PK, at the cost of higher WAL volume.
>>>
>>> This feels like a small, explicit extension of the existing RI semantics. Notably, REPLICA IDENTITY DEFAULT already has conditional behavior (PK fallback
>>> to NONE), so conditional RI behavior is not new, this would just make a different fallback explicit and user-chosen.
>>>
>>> After that, we could consider a database-level default_replica_identity setting, applied at table creation time, for environments that want this behavior consistently. But that would only make sense if we first agree on the table-level mechanism.
>>>
>>
>> I don't much like the database-level option as it expects a new
>> default to be introduced. I think the internal working will almost be
>> same as the option at publication-level.
>
> That’s fair. I agree that a database-level option wouldn’t be fundamentally different from a publication-level solution and would likely share most of the same internal mechanics.
>
> At this point nothing is decided yet; we’re still exploring different approaches and trying to understand the trade-offs.
>
> I have a question to better understand how a publication-level approach would behave in edge cases.
>
> Since replica identity is defined on tables and a table can belong to multiple publications, how should UPDATE/DELETE be handled if the same table is added to two publications with different expectations?
>
> For example, suppose a table without a PK is added to:
> - pub_a, which does not require FULL (or effectively falls back to NONE)
> - pub_b, which requires FULL for UPDATE/DELETE
>
> In this case, should UPDATE/DELETE on the table be allowed at all, and if so, based on which publication’s semantics? What do you think?
>
> Best regards,
> --
> Chao Li (Evan)
> HighGo Software Co., Ltd.
> https://www.highgo.com/
Hi Amit,
Following your suggestion, I implemented a PoC that adds a new publication parameter (tentatively named fallbackfull) to make the DEFAULT → FULL fallback behavior per-publication. I’m not attached to the parameter name — if we decide to go with the publication approach, I’m happy to adjust naming based on feedback.
After playing with this implementation for a couple of days, I ran into a few concerns:
1. Protocol extension required
If the DEFAULT → FULL fallback is triggered, the subscriber needs to know whether the corresponding publication has fallbackfull enabled in order to decide how to apply UPDATE/DELETE. That means we’d need to extend the logical replication protocol, e.g., by adding a new field to the RELATION message to carry the fallbackfull flag.
2. Impact on decoding plugins
Decoding plugins would need to understand this new flag. In my PoC, I updated pgoutput, but there may be third-party plugins that would also need changes. That feels like a compatibility risk.
3. Potential data-integrity issues
This is the most concerning part to me.
Consider a table t1 with REPLICA IDENTITY DEFAULT and no primary key, included in publication p1. By design, UPDATE/DELETE on t1 are not allowed.
However, a user could work around this by creating a dummy publication, adding t1 to it, and setting fallbackfull = true on that publication. This would effectively enable UPDATE/DELETE on t1.
Later, if the owner of p1 decides to enable fallbackfull on p1 to replicate t1, the subscriber of p1 may already be out of sync due to the earlier updates/deletes performed via the dummy publication. At that point, subsequent UPDATE/DELETE replication may fail or behave incorrectly.
From this perspective, allowing fallbackfull at the publication level seems to open the door to cross-publication interference and data divergence.
Given these concerns, I’m leaning toward keeping fallbackfull as a per-table option rather than a per-publication one. Curious to hear your thoughts.
I’ve attached v2 of the PoC implementing the publication-level approach for reference.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v2-0001-Add-fallbackfull-option-to-publication.patch (30.4K, 2-v2-0001-Add-fallbackfull-option-to-publication.patch)
download | inline diff:
From a128e79d5da7b70386fe0063db09812255827eb2 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Mon, 2 Feb 2026 15:50:57 +0800
Subject: [PATCH v2] Add fallbackfull option to publication
When a publication's fallbackfull option is true, then tables in
the publication get an ability of fallback replica to FULL if a
table's replica identity is DEFAULT but has no PK.
The code change in currently at PoC quality level, dirty logs
are included, so please reviewers only focus on the design part.
Author: Chao Li <[email protected]>
---
src/backend/access/heap/heapam.c | 61 ++++++++++++---------
src/backend/catalog/pg_publication.c | 1 +
src/backend/commands/publicationcmds.c | 58 +++++++++++++++++---
src/backend/executor/execReplication.c | 2 +-
src/backend/replication/logical/proto.c | 27 ++++++++-
src/backend/replication/logical/relation.c | 50 ++++++++++++++++-
src/backend/replication/logical/worker.c | 28 +++++++++-
src/backend/replication/pgoutput/pgoutput.c | 33 ++++++++++-
src/include/catalog/pg_publication.h | 4 ++
src/include/replication/logicalproto.h | 13 ++++-
src/include/replication/logicalrelation.h | 1 +
11 files changed, 233 insertions(+), 45 deletions(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3004964ab7f..d4f78b9807e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -60,7 +60,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup,
HeapTuple newtup, HeapTuple old_key_tuple,
- bool all_visible_cleared, bool new_all_visible_cleared);
+ bool all_visible_cleared, bool new_all_visible_cleared,
+ bool logical_identity_is_full);
#ifdef USE_ASSERT_CHECKING
static void check_lock_if_inplace_updateable_rel(Relation relation,
const ItemPointerData *otid,
@@ -107,7 +108,7 @@ static void index_delete_sort(TM_IndexDeleteOp *delstate);
static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
- bool *copy);
+ bool *copy, bool *ri_is_full);
/*
@@ -2859,6 +2860,7 @@ heap_delete(Relation relation, const ItemPointerData *tid,
bool all_visible_cleared = false;
HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */
bool old_key_copied = false;
+ bool logical_identity_is_full = false;
Assert(ItemPointerIsValid(tid));
@@ -3088,7 +3090,7 @@ l1:
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
*/
- old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+ old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied, &logical_identity_is_full);
/*
* If this is the first possibly-multixact-able operation in the current
@@ -3170,13 +3172,13 @@ l1:
xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
xlrec.xmax = new_xmax;
- if (old_key_tuple != NULL)
- {
- if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
- }
+ if (old_key_tuple != NULL)
+ {
+ if (logical_identity_is_full)
+ xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
+ }
XLogBeginInsert();
XLogRegisterData(&xlrec, SizeOfHeapDelete);
@@ -3326,6 +3328,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
HeapTuple heaptup;
HeapTuple old_key_tuple = NULL;
bool old_key_copied = false;
+ bool logical_identity_is_full = false;
Page page;
BlockNumber block;
MultiXactStatus mxact_status;
@@ -4113,7 +4116,7 @@ l2:
old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
bms_overlap(modified_attrs, id_attrs) ||
id_has_external,
- &old_key_copied);
+ &old_key_copied, &logical_identity_is_full);
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -4200,11 +4203,12 @@ l2:
log_heap_new_cid(relation, heaptup);
}
- recptr = log_heap_update(relation, buffer,
- newbuf, &oldtup, heaptup,
- old_key_tuple,
- all_visible_cleared,
- all_visible_cleared_new);
+ recptr = log_heap_update(relation, buffer,
+ newbuf, &oldtup, heaptup,
+ old_key_tuple,
+ all_visible_cleared,
+ all_visible_cleared_new,
+ logical_identity_is_full);
if (newbuf != buffer)
{
PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -8918,7 +8922,8 @@ static XLogRecPtr
log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
HeapTuple old_key_tuple,
- bool all_visible_cleared, bool new_all_visible_cleared)
+ bool all_visible_cleared, bool new_all_visible_cleared,
+ bool logical_identity_is_full)
{
xl_heap_update xlrec;
xl_heap_header xlhdr;
@@ -9007,14 +9012,14 @@ log_heap_update(Relation reln, Buffer oldbuf,
xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD;
if (need_tuple_data)
{
- xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
- if (old_key_tuple)
- {
- if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
- }
+ xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
+ if (old_key_tuple)
+ {
+ if (logical_identity_is_full)
+ xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
+ }
}
/* If new tuple is the single and first tuple on page... */
@@ -9219,7 +9224,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
*/
static HeapTuple
ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
- bool *copy)
+ bool *copy, bool *ri_is_full)
{
TupleDesc desc = RelationGetDescr(relation);
char replident = relation->rd_rel->relreplident;
@@ -9229,6 +9234,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
Datum values[MaxHeapAttributeNumber];
*copy = false;
+ *ri_is_full = false;
if (!RelationIsLogicallyLogged(relation))
return NULL;
@@ -9236,7 +9242,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
if (replident == REPLICA_IDENTITY_NOTHING)
return NULL;
- if (replident == REPLICA_IDENTITY_FULL)
+ if (logicalrep_identity_is_full(relation))
{
/*
* When logging the entire old tuple, it very well could contain
@@ -9247,6 +9253,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
*copy = true;
tp = toast_flatten_tuple(tp, desc);
}
+ *ri_is_full = true;
return tp;
}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9a4791c573e..8e17ead4f84 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1097,6 +1097,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubtruncate = pubform->pubtruncate;
pub->pubviaroot = pubform->pubviaroot;
pub->pubgencols_type = pubform->pubgencols;
+ pub->pubfallbackfull = pubform->pubfallbackfull;
ReleaseSysCache(tup);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index fc3a4c19e65..de0245530fe 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -81,13 +81,16 @@ parse_publication_options(ParseState *pstate,
bool *publish_via_partition_root_given,
bool *publish_via_partition_root,
bool *publish_generated_columns_given,
- char *publish_generated_columns)
+ char *publish_generated_columns,
+ bool *fallbackfull_given,
+ bool *fallbackfull)
{
ListCell *lc;
*publish_given = false;
*publish_via_partition_root_given = false;
*publish_generated_columns_given = false;
+ *fallbackfull_given = false;
/* defaults */
pubactions->pubinsert = true;
@@ -96,6 +99,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubtruncate = true;
*publish_via_partition_root = false;
*publish_generated_columns = PUBLISH_GENCOLS_NONE;
+ *fallbackfull = false;
/* Parse options */
foreach(lc, options)
@@ -168,6 +172,13 @@ parse_publication_options(ParseState *pstate,
*publish_generated_columns_given = true;
*publish_generated_columns = defGetGeneratedColsOption(defel);
}
+ else if (strcmp(defel->defname, "fallbackfull") == 0)
+ {
+ if (*fallbackfull_given)
+ errorConflictingDefElem(defel, pstate);
+ *fallbackfull_given = true;
+ *fallbackfull = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -281,6 +292,7 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
bool result = false;
Datum rfdatum;
bool rfisnull;
+ Publication *pub;
/*
* FULL means all columns are in the REPLICA IDENTITY, so all columns are
@@ -289,6 +301,20 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
return false;
+ /*
+ * If REPLICA INDENTITY is DEFAULT and no replica index exists, see if we
+ * should fallback to FULL.
+ */
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT &&
+ !OidIsValid(RelationGetReplicaIndex(relation)))
+ {
+ Publication * pub = GetPublication(pubid);
+ if (pub->pubfallbackfull)
+ {
+ return false;
+ }
+ }
+
/*
* For a partition, if pubviaroot is true, find the topmost ancestor that
* is published via this publication as we need to use its row filter
@@ -394,9 +420,12 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
pub = GetPublication(pubid);
check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
- if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL
+ || (relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT &&
+ !OidIsValid(RelationGetReplicaIndex(relation)) &&
+ pub->pubfallbackfull))
{
- /* With REPLICA IDENTITY FULL, no column list is allowed. */
+ /* With REPLICA IDENTITY FULL or fallback to FULL, no column list is allowed. */
*invalid_column_list = (columns != NULL);
/*
@@ -842,6 +871,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root;
bool publish_generated_columns_given;
char publish_generated_columns;
+ bool fallbackfull_given;
+ bool fallbackfull;
AclResult aclresult;
List *relations = NIL;
List *schemaidlist = NIL;
@@ -886,11 +917,13 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
&publish_via_partition_root_given,
&publish_via_partition_root,
&publish_generated_columns_given,
- &publish_generated_columns);
+ &publish_generated_columns,
+ &fallbackfull_given,
+ &fallbackfull);
if (stmt->for_all_sequences &&
(publish_given || publish_via_partition_root_given ||
- publish_generated_columns_given))
+ publish_generated_columns_given || fallbackfull_given))
ereport(NOTICE,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
@@ -914,6 +947,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(publish_via_partition_root);
values[Anum_pg_publication_pubgencols - 1] =
CharGetDatum(publish_generated_columns);
+ values[Anum_pg_publication_pubfallbackfull - 1] =
+ BoolGetDatum(fallbackfull);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -1010,6 +1045,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
bool publish_via_partition_root;
bool publish_generated_columns_given;
char publish_generated_columns;
+ bool fallbackfull_given;
+ bool fallbackfull;
ObjectAddress obj;
Form_pg_publication pubform;
List *root_relids = NIL;
@@ -1023,11 +1060,13 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
&publish_via_partition_root_given,
&publish_via_partition_root,
&publish_generated_columns_given,
- &publish_generated_columns);
+ &publish_generated_columns,
+ &fallbackfull_given,
+ &fallbackfull);
if (pubform->puballsequences &&
(publish_given || publish_via_partition_root_given ||
- publish_generated_columns_given))
+ publish_generated_columns_given || fallbackfull_given))
ereport(NOTICE,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
@@ -1143,6 +1182,11 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
replaces[Anum_pg_publication_pubgencols - 1] = true;
}
+ if (fallbackfull_given)
+ {
+ values[Anum_pg_publication_pubfallbackfull - 1] = BoolGetDatum(fallbackfull);
+ replaces[Anum_pg_publication_pubfallbackfull - 1] = true;
+ }
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 743b1ee2b28..0605f18e0e9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -1089,7 +1089,7 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
return;
/* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ if (logicalrep_identity_is_full(rel))
return;
/*
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3950dd0cf46..36fed2c686b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -452,6 +452,10 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
bool binary, Bitmapset *columns,
PublishGencolsType include_gencols_type)
{
+ ereport(LOG,
+ (errmsg("EVAN: logical replication: send UPDATE for relation \"%s\" (oid %u)",
+ RelationGetRelationName(rel), RelationGetRelid(rel))));
+
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
@@ -534,6 +538,10 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ ereport(LOG,
+ (errmsg("EVAN: logical replication: send DELETE for relation \"%s\" (oid %u)",
+ RelationGetRelationName(rel), RelationGetRelid(rel))));
+
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
/* transaction ID (if not valid, we're not streaming) */
@@ -666,10 +674,15 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
void
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
Bitmapset *columns,
- PublishGencolsType include_gencols_type)
+ PublishGencolsType include_gencols_type,
+ bool fallbackfull, uint32 proto_version)
{
char *relname;
+ ereport(LOG,
+ (errmsg("EVAN: logical replication: send RELATION for \"%s\" (oid %u), fallbackfull=%s",
+ RelationGetRelationName(rel), RelationGetRelid(rel), fallbackfull ? "true" : "false")));
+
pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
/* transaction ID (if not valid, we're not streaming) */
@@ -687,6 +700,10 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
/* send replica identity */
pq_sendbyte(out, rel->rd_rel->relreplident);
+ /* send publication fallbackfull flag if supported */
+ if (proto_version >= LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM)
+ pq_sendbyte(out, fallbackfull ? 1 : 0);
+
/* send the attribute info */
logicalrep_write_attrs(out, rel, columns, include_gencols_type);
}
@@ -695,7 +712,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
* Read the relation info from stream and return as LogicalRepRelation.
*/
LogicalRepRelation *
-logicalrep_read_rel(StringInfo in)
+logicalrep_read_rel(StringInfo in, uint32 proto_version)
{
LogicalRepRelation *rel = palloc_object(LogicalRepRelation);
@@ -708,6 +725,12 @@ logicalrep_read_rel(StringInfo in)
/* Read the replica identity. */
rel->replident = pq_getmsgbyte(in);
+ /* Read publication fallbackfull flag if present. */
+ if (proto_version >= LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM)
+ rel->fallbackfull = (pq_getmsgbyte(in) != 0);
+ else
+ rel->fallbackfull = false;
+
/* relkind is not sent */
rel->relkind = 0;
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 0b1d80b5b0f..28b43e901f3 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -21,6 +21,7 @@
#include "access/genam.h"
#include "access/table.h"
#include "catalog/namespace.h"
+#include "catalog/pg_publication.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
@@ -209,6 +210,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
(remoterel->relkind == 0) ? RELKIND_RELATION : remoterel->relkind;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+ entry->remoterel.fallbackfull = remoterel->fallbackfull;
MemoryContextSwitchTo(oldctx);
}
@@ -326,7 +328,10 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
* If no replica identity index and no PK, the published table must
* have replica identity FULL.
*/
- if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ if (idkey == NULL &&
+ remoterel->replident != REPLICA_IDENTITY_FULL &&
+ !(remoterel->replident == REPLICA_IDENTITY_DEFAULT &&
+ remoterel->fallbackfull))
entry->updatable = false;
}
@@ -713,6 +718,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
+ entry->remoterel.fallbackfull = remoterel->fallbackfull;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
}
@@ -777,6 +783,44 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
return entry;
}
+/*
+ * logicalrep_identity_is_full
+ *
+ * Check whether the replica identity of the relation is full or not.
+ * When a table's replica identity is default, but there is no primary key,
+ * if any publication the relation is in has fallbackfull enabled, we consider
+ * the replica identity as full. This function should only be called on the
+ * publisher.
+ */
+bool
+logicalrep_identity_is_full(Relation relation)
+{
+ Form_pg_class relform = RelationGetForm(relation);
+
+ if (relform->relreplident == REPLICA_IDENTITY_FULL)
+ return true;
+
+ if (relform->relreplident == REPLICA_IDENTITY_DEFAULT &&
+ !OidIsValid(RelationGetReplicaIndex(relation)))
+ {
+ /* relreplident is default, but no primary key, check if we can fallback to full. */
+ List *pubids = GetRelationPublications(RelationGetRelid(relation));
+ foreach_oid(pubid, pubids)
+ {
+ Publication *pub = GetPublication(pubid);
+
+ if (pub->pubfallbackfull)
+ {
+ list_free(pubids);
+ return true;
+ }
+ }
+ list_free(pubids);
+ }
+
+ return false;
+}
+
/*
* Returns the oid of an index that can be used by the apply worker to scan
* the relation.
@@ -938,7 +982,9 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
if (OidIsValid(idxoid))
return idxoid;
- if (remoterel->replident == REPLICA_IDENTITY_FULL)
+ if (remoterel->replident == REPLICA_IDENTITY_FULL ||
+ (remoterel->replident == REPLICA_IDENTITY_DEFAULT &&
+ remoterel->fallbackfull))
{
/*
* We are looking for one more opportunity for using an index. If
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32725c48623..ad683eb822d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,6 +486,7 @@ static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
+static uint32 LogicalRepProtoVersion = LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM;
static TransactionId stream_xid = InvalidTransactionId;
@@ -2567,7 +2568,7 @@ apply_handle_relation(StringInfo s)
if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
return;
- rel = logicalrep_read_rel(s);
+ rel = logicalrep_read_rel(s, LogicalRepProtoVersion);
logicalrep_relmap_update(rel);
/* Also reset all entries in the partition map that refer to remoterel. */
@@ -3050,6 +3051,24 @@ apply_handle_delete(StringInfo s)
/* Check if we can do the delete. */
check_relation_updatable(rel);
+ /*
+ * Before fallbackfull was added as an option to publication, if a table's
+ * replica identity is default but without a PK, it cannot be updated, so
+ * check_relation_updatable() treated unexpected update as error.
+ *
+ * However, with fallbackfull, such table can be updated because the table
+ * might belong to anther publication with fallbackfull enabled. So here
+ * we just skip the update if the table in current publication is not
+ * updatable.
+ */
+ if (!rel->updatable)
+ {
+ /* XXX: should close with NoLock or RowExclusiveLock??? */
+ logicalrep_rel_close(rel, NoLock);
+ end_replication_step();
+ return;
+ }
+
/*
* Make sure that any user-supplied code runs as the table owner, unless
* the user has opted out of that behavior.
@@ -3189,7 +3208,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
Assert(OidIsValid(localidxoid) ||
- (remoterel->replident == REPLICA_IDENTITY_FULL));
+ (remoterel->replident == REPLICA_IDENTITY_FULL) ||
+ (remoterel->replident == REPLICA_IDENTITY_DEFAULT && remoterel->fallbackfull));
if (OidIsValid(localidxoid))
{
@@ -5523,11 +5543,15 @@ set_stream_options(WalRcvStreamOptions *options,
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options->proto.logical.proto_version =
+ server_version >= 190000 ? LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM :
server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
+ /* Cache the chosen protocol version for RELATION message parsing. */
+ LogicalRepProtoVersion = options->proto.logical.proto_version;
+
options->proto.logical.publication_names = MySubscription->publications;
options->proto.logical.binary = MySubscription->binary;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e016f64e0b3..09344d63a85 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -145,6 +145,19 @@ typedef struct RelationSyncEntry
/* are we publishing this rel? */
PublicationActions pubactions;
+ /* when replica identity is default but no pk, should we fallback to full? */
+ bool fallbackfull;
+
+ /*
+ * when replica identity is defaut and no pk and fallbackfull is false, we
+ * should not send any updates/deletes for this relation. But if the table
+ * belong to another publication that has falllbackfull enabled, then the
+ * talbe can still be updated/deleted, thus still has WAL generated, but
+ * we should skip sending those changes to downstream. This flag is used
+ * to indicate that.
+ */
+ bool block_update_delete;
+
/*
* ExprState array for row filter. Different publication actions don't
* allow multiple expressions to always be combined into one, because
@@ -799,6 +812,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TupleDesc desc = RelationGetDescr(relation);
Bitmapset *columns = relentry->columns;
PublishGencolsType include_gencols_type = relentry->include_gencols_type;
@@ -830,7 +844,9 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation, columns,
- include_gencols_type);
+ include_gencols_type,
+ relentry->fallbackfull,
+ data->protocol_version);
OutputPluginWrite(ctx, false);
}
@@ -1519,10 +1535,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
return;
+ if (relentry->block_update_delete)
+ return;
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
return;
+ if (relentry->block_update_delete)
+ return;
/*
* This is only possible if deletes are allowed even when replica
@@ -2057,6 +2077,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
bool found;
MemoryContext oldctx;
Oid relid = RelationGetRelid(relation);
+ Form_pg_class relform = RelationGetForm(relation);
Assert(RelationSyncCache != NULL);
@@ -2075,6 +2096,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->fallbackfull = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2130,6 +2152,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->fallbackfull = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2267,6 +2290,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->fallbackfull |= pub->pubfallbackfull;
/*
* We want to publish the changes as the top-most ancestor
@@ -2302,6 +2326,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
}
}
+ if (!entry->fallbackfull &&
+ relform->relreplident == REPLICA_IDENTITY_DEFAULT &&
+ !OidIsValid(RelationGetReplicaIndex(relation)))
+ entry->block_update_delete = true;
+ else
+ entry->block_update_delete = false;
+
entry->publish_as_relid = publish_as_relid;
/*
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 368becca899..f58418c2131 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -66,6 +66,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
* if stored generated column data should be published.
*/
char pubgencols;
+
+ /* true if fallbackfull is enabled */
+ bool pubfallbackfull;
} FormData_pg_publication;
/* ----------------
@@ -138,6 +141,7 @@ typedef struct Publication
bool allsequences;
bool pubviaroot;
PublishGencolsType pubgencols_type;
+ bool pubfallbackfull;
PublicationActions pubactions;
} Publication;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..1898ed2120c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -36,13 +36,17 @@
* LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
* where we support applying large streaming transactions in parallel.
* Introduced in PG16.
+ *
+ * LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM is the minimum protocol version
+ * that includes the fallbackfull flag in RELATION messages.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
+#define LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM 5
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM
/*
* Logical message types
@@ -111,6 +115,7 @@ typedef struct LogicalRepRelation
char **attnames; /* column names */
Oid *atttyps; /* column types */
char replident; /* replica identity */
+ bool fallbackfull; /* publication fallback to full identity */
char relkind; /* remote relation kind */
Bitmapset *attkeys; /* Bitmap of key columns */
} LogicalRepRelation;
@@ -250,8 +255,10 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
bool transactional, const char *prefix, Size sz, const char *message);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel, Bitmapset *columns,
- PublishGencolsType include_gencols_type);
-extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
+ PublishGencolsType include_gencols_type,
+ bool fallbackfull, uint32 proto_version);
+extern LogicalRepRelation *logicalrep_read_rel(StringInfo in,
+ uint32 proto_version);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
extern void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index efe0f9d6031..a4f5ecfddc5 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -48,6 +48,7 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
Relation partrel, AttrMap *map);
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
+extern bool logicalrep_identity_is_full(Relation relation);
extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
extern Oid GetRelationIdentityOrPK(Relation rel);
--
2.50.1 (Apple Git-155)
view thread (3+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Improve logical replication usability when tables lack primary keys
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox