From 09f504db0028ca35f4b1cba779a26ce6eb3dec43 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Wed, 4 Feb 2026 12:26:21 +0530 Subject: [PATCH v40 2/3] handle EXCEPT TABLE correctly with partitioned tables When a publication is created with EXCEPT TABLE, adjust logical replication so that data synchronization and change replication correctly respect exclusions for partitioned tables. On the subscriber side, extend fetch_remote_table_info() to compute the effective set of relations used for the initial COPY. When exclusions are present, the root partitioned table can no longer be used directly; instead, derive the list of non-excluded leaf partitions and combine them with UNION ALL. When no exclusions exist, retain the existing behavior and copy from the root relation as before. This is based on approach 1 discussed at: https://www.postgresql.org/message-id/CAJpy0uD81HRrMYr7S-6AV4W2PtbGKM-nf2D89zsoMHJ9jZssUg@mail.gmail.com This patch is a topup patch on top of 0001 patch. --- doc/src/sgml/ref/create_publication.sgml | 17 +- src/backend/catalog/pg_publication.c | 448 +++++++++++++++++- src/backend/commands/subscriptioncmds.c | 2 + src/backend/replication/logical/tablesync.c | 150 +++++- src/backend/replication/pgoutput/pgoutput.c | 25 +- src/include/catalog/pg_proc.dat | 7 + src/include/replication/worker_internal.h | 6 + .../t/037_rep_changes_except_table.pl | 156 +++--- src/tools/pgindent/typedefs.list | 1 + 9 files changed, 688 insertions(+), 124 deletions(-) diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 1e091bb3c6d..730b9c4bced 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -205,16 +205,15 @@ CREATE PUBLICATION name tables are excluded. - For partitioned tables, when publish_via_partition_root - is set to true, specifying a root partitioned table in - EXCEPT TABLE excludes it and all its partitions from - replication. Specifying a leaf partition has no effect, as its changes are - still replicated via the root partitioned table. When + For partitioned tables, when a table is specified in EXCEPT TABLE, then + changes to that table and all of its partitions (that is, the entire + partition subtree rooted at that table) are not replicated. This behavior + is the same regardless of whether publish_via_partition_root is set to - false, specifying a root partitioned table has no - effect, as changes are replicated via the leaf partitions. Specifying a - leaf partition excludes only that partition from replication. The optional - * has no meaning for partitioned tables. + true or false. The + publish_via_partition_root setting only determines + which relation is used as the publishing relation for replicated changes, + and does not affect exclusion semantics. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 09c69005122..dd8db2165fe 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -468,26 +468,6 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, RelationGetRelationName(targetrel), pub->name))); } - /* - * Handle the case where a partition is excluded by EXCEPT TABLE while - * publish_via_partition_root = true. - */ - if (pub->alltables && pub->pubviaroot && pri->except && - targetrel->rd_rel->relispartition) - ereport(WARNING, - (errmsg("partition \"%s\" might be replicated as publish_via_partition_root is \"%s\"", - RelationGetRelationName(targetrel), "true"))); - - /* - * Handle the case where a partitioned table is excluded by EXCEPT TABLE - * while publish_via_partition_root = false. - */ - if (pub->alltables && !pub->pubviaroot && pri->except && - targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(WARNING, - (errmsg("partitioned table \"%s\" might be replicated as publish_via_partition_root is \"%s\"", - RelationGetRelationName(targetrel), "false"))); - check_publication_add_relation(targetrel); /* Validate and translate column names into a Bitmapset of attnums. */ @@ -960,8 +940,8 @@ GetAllPublicationRelations(Publication *pub, char relkind) if (relkind == RELKIND_RELATION) exceptlist = GetAllPublicationExcludedTables(pubid, pubviaroot ? - PUBLICATION_PART_ALL : - PUBLICATION_PART_ROOT); + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); classRel = table_open(RelationRelationId, AccessShareLock); @@ -1205,6 +1185,430 @@ GetPublicationByName(const char *pubname, bool missing_ok) return OidIsValid(oid) ? GetPublication(oid) : NULL; } + +/* Helper: Resolve the immediate parent of a partition */ +static Oid +get_parent_oid(Oid relid) +{ + List *ancestors = get_partition_ancestors(relid); + + if (ancestors == NIL) + return InvalidOid; + return linitial_oid(ancestors); +} + +/* Helper: Check syscache for prexcept flag */ +static bool +is_relid_published_as_except(Oid relid, Oid pubid) +{ + HeapTuple tup; + bool result = false; + + tup = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); + if (HeapTupleIsValid(tup)) + { + Form_pg_publication_rel prform = (Form_pg_publication_rel) GETSTRUCT(tup); + + result = prform->prexcept; + ReleaseSysCache(tup); + } + return result; +} + +/* + * publication_has_any_exception + * + * Returns true if the given publication OID has at least one entry in + * pg_publication_rel marked as an exception (prexcept = true). + */ +static bool +publication_has_any_exception(Oid pubid) +{ + Relation pubRel; + ScanKeyData skey; + SysScanDesc scan; + HeapTuple tup; + bool found = false; + + /* Open pg_publication_rel with AccessShareLock */ + pubRel = table_open(PublicationRelRelationId, AccessShareLock); + + /* + * * Look up by publication OID. Using the index on (prpubid) is highly + * efficient. + */ + ScanKeyInit(&skey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubRel, + PublicationRelPrrelidPrpubidIndexId, + true, NULL, 1, &skey); + + /* We only need to find the first occurrence of prexcept = true */ + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrelform = (Form_pg_publication_rel) GETSTRUCT(tup); + + if (pubrelform->prexcept) + { + found = true; + break; + } + } + + systable_endscan(scan); + table_close(pubRel, AccessShareLock); + + return found; +} + +/* + * is_relid_published_explicitly + * + * Checks if the given relation OID is explicitly part of the publication. + * This corresponds to the 'FOR TABLE' syntax. + */ +static bool +is_relid_published_explicitly(Oid relid, Oid pubid) +{ + /* + * Search the syscache for pg_publication_rel using the (relid, pubid) + * index. + */ + return SearchSysCacheExists2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); +} + +/* + * is_schema_published + * + * Checks if the given namespace OID is part of the publication's + * schema list. This corresponds to the 'FOR TABLES IN SCHEMA' syntax. + */ +static bool +is_schema_published(Oid nspid, Oid pubid) +{ + /* + * Search the syscache for pg_publication_namespace using the (nspid, + * pubid) index. + */ + return SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(nspid), + ObjectIdGetDatum(pubid)); +} + +/* + * pg_get_publication_effective_tables + * + * Given a root relation and a list of publications, calculate the set of + * relations that are effectively published. This is necessary for + * "FOR ALL TABLES" publications that use "EXCEPT TABLE" filters. + * + * The function resolves the smallest possible set of ancestors that cover + * all non-excluded leaf partitions. + */ +Datum +pg_get_publication_effective_tables(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + List *results; + + if (SRF_IS_FIRSTCALL()) + { + Oid root_relid = PG_GETARG_OID(0); + ArrayType *pub_names_array = PG_GETARG_ARRAYTYPE_P(1); + MemoryContext oldcontext; + List *pub_oids = NIL; + Datum *pub_datums; + bool *pub_nulls; + int pub_count; + TupleDesc tupdesc; + List *final_output = NIL; + bool has_clean_all_tables_pub = false; + bool has_any_exclusion = false; + ListCell *lc; + + funcctx = SRF_FIRSTCALL_INIT(); + + deconstruct_array(pub_names_array, TEXTOID, -1, false, 'i', + &pub_datums, &pub_nulls, &pub_count); + for (int i = 0; i < pub_count; i++) + { + if (!pub_nulls[i]) + { + char *pubname = TextDatumGetCString(pub_datums[i]); + + pub_oids = lappend_oid(pub_oids, get_publication_oid(pubname, false)); + } + } + + /* + * Determine if we can avoid the expensive expansion. If any + * publication is 'FOR ALL TABLES' with no exceptions, or if + * absolutely no exceptions exist across all pubs, the root is the + * result. + */ + foreach(lc, pub_oids) + { + Oid puboid = lfirst_oid(lc); + HeapTuple pubTup; + Form_pg_publication pubform; + + pubTup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(puboid)); + if (!HeapTupleIsValid(pubTup)) + continue; + + pubform = (Form_pg_publication) GETSTRUCT(pubTup); + + /* Check for any exclusion entries in this publication */ + if (publication_has_any_exception(puboid)) + has_any_exclusion = true; + else if (pubform->puballtables) + { + /* + * Found a publication that includes everything without + * exception + */ + has_clean_all_tables_pub = true; + ReleaseSysCache(pubTup); + break; + } + ReleaseSysCache(pubTup); + } + + /* Return root immediately if no filtering logic is needed */ + if (has_clean_all_tables_pub || !has_any_exclusion) + { + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + final_output = list_make1_oid(root_relid); + MemoryContextSwitchTo(oldcontext); + } + else + { + List *all_tables; + List *all_leaves = NIL; + List *excluded_leaves = NIL; + List *allowed_leaves = NIL; + List *candidate_list = NIL; + + /* Get all the leaf relations */ + all_leaves = GetPubPartitionOptionRelations(all_leaves, + PUBLICATION_PART_LEAF, + root_relid); + + /* + * Identify common excluded relations (Inheritance-aware). A table + * is excluded only if every publication excludes it, either + * explicitly or by excluding one of its ancestors. + */ + foreach(lc, all_leaves) + { + Oid curr_relid = lfirst_oid(lc); + ListCell *lp; + bool is_covered_by_any_pub = false; + + /* Get the inheritance chain for the current table once */ + List *ancestors = get_partition_ancestors(curr_relid); + + foreach(lp, pub_oids) + { + Oid pubid = lfirst_oid(lp); + HeapTuple pubTup; + Form_pg_publication pubform; + bool is_all_tables; + + pubTup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(pubTup)) + continue; + + pubform = (Form_pg_publication) GETSTRUCT(pubTup); + is_all_tables = pubform->puballtables; + ReleaseSysCache(pubTup); + + if (is_all_tables) + { + /* FOR ALL TABLES: Check that it isn't excluded */ + bool is_excluded = false; + + if (is_relid_published_as_except(curr_relid, pubid)) + is_excluded = true; + else + { + ListCell *la; + + foreach(la, ancestors) + { + if (is_relid_published_as_except(lfirst_oid(la), pubid)) + { + is_excluded = true; + break; + } + } + } + + if (!is_excluded) + { + is_covered_by_any_pub = true; + break; + } + } + else + { + /* + * Check if the table itself or its schema is in + * the pub. + */ + if (is_relid_published_explicitly(curr_relid, pubid) || + is_schema_published(get_rel_namespace(curr_relid), pubid)) + { + is_covered_by_any_pub = true; + } + else + { + /* + * Check if any ancestor or an ancestor's schema is + * in the pub. + */ + ListCell *la; + + foreach(la, ancestors) + { + Oid anc_oid = lfirst_oid(la); + + if (is_relid_published_explicitly(anc_oid, pubid) || + is_schema_published(get_rel_namespace(anc_oid), pubid)) + { + is_covered_by_any_pub = true; + break; + } + } + } + } + + /* + * If any publication covers this table, we can stop + * checking others + */ + if (is_covered_by_any_pub) + break; + } + + /* + * Instead of identifying 'excluded', we identify 'allowed'. + * If no publication covers this table, it's effectively + * excluded. + */ + if (!is_covered_by_any_pub) + excluded_leaves = lappend_oid(excluded_leaves, curr_relid); + + list_free(ancestors); + } + + /* Calculate allowed leaves (Total - Excluded) */ + foreach(lc, all_leaves) + { + Oid curr_relid = lfirst_oid(lc); + + if (!list_member_oid(excluded_leaves, curr_relid)) + allowed_leaves = lappend_oid(allowed_leaves, curr_relid); + } + + /* Bottom-Up Collapse. Check if parents can represent children */ + all_tables = find_all_inheritors(root_relid, AccessShareLock, NULL); + candidate_list = list_copy(allowed_leaves); + foreach(lc, all_tables) + { + Oid curr_relid = lfirst_oid(lc); + + if (get_rel_relkind(curr_relid) == RELKIND_PARTITIONED_TABLE) + { + List *branch_tree = find_all_inheritors(curr_relid, AccessShareLock, NULL); + List *branch_leaves = NIL; + ListCell *lcb; + bool all_allowed = true; + + foreach(lcb, branch_tree) + { + Oid b_oid = lfirst_oid(lcb); + + if (get_rel_relkind(b_oid) != RELKIND_PARTITIONED_TABLE) + branch_leaves = lappend_oid(branch_leaves, b_oid); + } + + if (list_length(branch_leaves) == 0) + all_allowed = false; + else + { + foreach(lcb, branch_leaves) + { + if (!list_member_oid(allowed_leaves, lfirst_oid(lcb))) + { + all_allowed = false; + break; + } + } + } + + if (all_allowed) + candidate_list = list_append_unique_oid(candidate_list, curr_relid); + } + } + + /* + * Deduplicate: Only keep the highest level representative of any + * branch + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + foreach(lc, candidate_list) + { + Oid curr_relid = lfirst_oid(lc); + Oid parent_oid = get_parent_oid(curr_relid); + + if (parent_oid == InvalidOid || !list_member_oid(candidate_list, parent_oid)) + final_output = lappend_oid(final_output, curr_relid); + } + + MemoryContextSwitchTo(oldcontext); + } + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + /* Construct a tuple descriptor for the result rows. */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "schemaname", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relname", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + funcctx->user_fctx = final_output; + + MemoryContextSwitchTo(oldcontext); + } + + /* SRF Per-call Resume */ + funcctx = SRF_PERCALL_SETUP(); + results = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(results)) + { + Oid current_relid = list_nth_oid(results, (int) funcctx->call_cntr); + HeapTuple rettuple; + Datum values[2]; + bool nulls[2] = {false, false}; + + values[0] = CStringGetTextDatum(get_namespace_name(get_rel_namespace(current_relid))); + values[1] = CStringGetTextDatum(get_rel_name(current_relid)); + + rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple)); + } + + SRF_RETURN_DONE(funcctx); +} + /* * Get information of the tables in the given publication array. * diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0b3c8499b49..804ae2f349e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -2940,6 +2940,8 @@ fetch_relation_list(WalReceiverConn *wrconn, List *publications) pub_names.data); } + elog(LOG, "fetch_relation_list: executing query to fetch effectiverelations: \n%s", + cmd.data); pfree(pub_names.data); res = walrcv_exec(wrconn, cmd.data, column_count, tableRow); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 19a3c21a863..c41fe41ea7a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -716,21 +716,28 @@ copy_read_data(void *outbuf, int minread, int maxread) * message provides during replication. * * This function also returns (a) the relation qualifications to be used in - * the COPY command, and (b) whether the remote relation has published any - * generated column. + * the COPY command, (b) whether the remote relation has published any + * generated column, and (c) computes the effective set of relations to be used + * as COPY sources when exclusions are present. When no exclusions exist, the + * list remains empty and the root relation is used as-is. When exclusions + * exist, the list contains leaf relations that are not excluded and must be + * combined using UNION ALL. */ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, - List **qual, bool *gencol_published) + List **qual, bool *gencol_published, + List **effective_relations) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; + Oid filtertableRow[] = {TEXTOID, TEXTOID}; bool isnull; int natt; + bool is_partition; StringInfo pub_names = NULL; Bitmapset *included_cols = NULL; int server_version = walrcv_server_version(LogRepWorkerWalRcvConn); @@ -740,7 +747,7 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -770,6 +777,8 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + is_partition = DatumGetBool(slot_getattr(slot, 4, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -954,6 +963,75 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, walrcv_clear_result(res); + if (server_version >= 190000 && !is_partition && + lrel->relkind == RELKIND_PARTITIONED_TABLE) + { + resetStringInfo(&cmd); + + /* + * This query recursively traverses the inheritance (partition) tree + * starting from the given table OID and determines which leaf + * relations should be included for replication. Exclusion propagates + * from parent to child, and a relation is also treated as excluded if + * it is explicitly marked with prexcept = true in pg_publication_rel + * for the specified publications. The final result returns only + * non excluded leaf relations. + */ + appendStringInfo(&cmd, "SELECT schemaname, relname FROM pg_get_publication_effective_tables(%u, ARRAY[%s]);", + lrel->remoteid, + pub_names->data); + + elog(LOG, "Executing query to get the tables:\n%s", cmd.data); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(filtertableRow), filtertableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not get non excluded table list for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err)); + + /* + * Store the tables as a list of schemaname and tablename. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + QualifiedRelationName *relinfo = palloc_object(QualifiedRelationName); + + relinfo->schemaname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relinfo->relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *effective_relations = lappend(*effective_relations, relinfo); + + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + /* + * If there is exactly one item in the effective_relations list and it + * equals the table being processed, that means no actual exclusion + * occurred. + */ + if (list_length(*effective_relations) == 1) + { + QualifiedRelationName *relinfo; + + relinfo = linitial(*effective_relations); + if (strcmp(nspname, relinfo->schemaname) == 0 && + strcmp(relname, relinfo->relname) == 0) + { + pfree(relinfo->schemaname); + pfree(relinfo->relname); + list_free_deep(*effective_relations); + *effective_relations = NIL; + } + } + } + /* * Get relation's row filter expressions. DISTINCT avoids the same * expression of a table in multiple publications from being included @@ -1043,6 +1121,7 @@ copy_table(Relation rel) LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; List *qual = NIL; + List *effective_relations = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -1054,7 +1133,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel, &qual, - &gencol_published); + &gencol_published, &effective_relations); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1066,12 +1145,64 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); + + if (effective_relations && list_length(effective_relations)) + { + bool first = true; + + /* + * Build a single COPY command to synchronize all resolved relations + * into the root table. + * + * The array 'effective_relations' contains the leaf tables of + * partition hierarchies, with excluded subtrees removed according to + * the EXCEPT clauses. This applies only when + * 'publish_via_partition_root' is enabled, since the initial sync must + * route all changes to the root table. + * + * We construct a UNION ALL query that combines data from multiple leaf + * relations into one sub-COPY statement, ensuring all rows are copied + * consistently into the root table. + */ + appendStringInfoString(&cmd, "COPY (\n"); + foreach_ptr(QualifiedRelationName, relinfo, effective_relations) + { + if (!first) + appendStringInfoString(&cmd, "UNION ALL\n"); + + first = false; + + appendStringInfoString(&cmd, "SELECT "); + + /* If the table has columns, then specify the columns */ + if (lrel.natts) + { + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + } + else + appendStringInfoString(&cmd, " * "); + + appendStringInfo(&cmd, " FROM %s\n", + quote_qualified_identifier(relinfo->schemaname, + relinfo->relname)); + } + + appendStringInfoString(&cmd, ")\n"); + appendStringInfoString(&cmd, "TO STDOUT"); + } /* Regular or partitioned table with no row filter or generated columns */ - if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE) - && qual == NIL && !gencol_published) + else if ((lrel.relkind == RELKIND_RELATION || + lrel.relkind == RELKIND_PARTITIONED_TABLE) && + qual == NIL && !gencol_published) { appendStringInfo(&cmd, "COPY %s", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + quote_qualified_identifier(lrel.nspname, lrel.relname)); /* If the table has columns, then specify the columns */ if (lrel.natts) @@ -1157,6 +1288,7 @@ copy_table(Relation rel) } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); + elog(LOG, "Tablesync worker: Executing query to get the initial sync data:\n%s", cmd.data); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9e1dc764d65..f01c9a40faa 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -2208,13 +2208,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * If this is a FOR ALL TABLES publication and it has an EXCEPT * TABLE list: * - * 1. If pubviaroot is set and the relation is a partition, check - * whether the partition root is included in the EXCEPT TABLE - * list. If so, do not publish the change. - * - * 2. If pubviaroot is not set, check whether the relation itself - * is included in the EXCEPT TABLE list. If so, do not publish the - * change. + * If the relation is a partition, check whether the current + * relation or any of the ancestors is included in the EXCEPT + * TABLE list. If so, do not publish the change. This is done + * irrespective of pubviaroot setting. * * "Do not publish the change" is achieved by keeping the variable * "publish" set to false. And eventually, entry->pubactions will @@ -2224,15 +2221,21 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { List *exceptpubids = NIL; - if (pub->pubviaroot && am_partition) + if (am_partition) { List *ancestors = get_partition_ancestors(relid); - pub_relid = llast_oid(ancestors); - ancestor_level = list_length(ancestors); + foreach_oid(ancestor, ancestors) + GetRelationPublications(ancestor, NULL, &exceptpubids); + + if (pub->pubviaroot) + { + pub_relid = llast_oid(ancestors); + ancestor_level = list_length(ancestors); + } } - GetRelationPublications(pub_relid, NULL, &exceptpubids); + GetRelationPublications(relid, NULL, &exceptpubids); if (!list_member_oid(exceptpubids, pub->oid)) publish = true; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 5e5e33f64fc..6b54c290bd3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12336,6 +12336,13 @@ proname => 'pg_relation_is_publishable', provolatile => 's', prorettype => 'bool', proargtypes => 'regclass', prosrc => 'pg_relation_is_publishable' }, +{ oid => '9002', descr => 'get effective tables for publication', + proname => 'pg_get_publication_effective_tables', prorows => '1000', + proretset => 't', provolatile => 's', + prorettype => 'record', proargtypes => 'oid _text', + proallargtypes => '{oid,_text,text,text}', proargmodes => '{i,i,o,o}', + proargnames => '{root_relid,pub_names,schemaname,relname}', + prosrc => 'pg_get_publication_effective_tables' }, # rls { oid => '3298', diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index c1285fdd1bc..cab97531276 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -234,6 +234,12 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +typedef struct QualifiedRelationName +{ + char *schemaname; + char *relname; +} QualifiedRelationName; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; diff --git a/src/test/subscription/t/037_rep_changes_except_table.pl b/src/test/subscription/t/037_rep_changes_except_table.pl index f4c9714bf1c..6629f3115fb 100644 --- a/src/test/subscription/t/037_rep_changes_except_table.pl +++ b/src/test/subscription/t/037_rep_changes_except_table.pl @@ -90,8 +90,11 @@ $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema"); $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE sch1.t1(a int) PARTITION BY RANGE(a); - CREATE TABLE sch1.part1 PARTITION OF sch1.t1 FOR VALUES FROM (0) TO (5); - CREATE TABLE sch1.part2 PARTITION OF sch1.t1 FOR VALUES FROM (6) TO (10); + CREATE TABLE sch1.part1 PARTITION OF sch1.t1 FOR VALUES FROM (0) TO (100); + CREATE TABLE sch1.part2(a int) PARTITION BY RANGE(a); + CREATE TABLE sch1.part2_1 PARTITION OF sch1.part2 FOR VALUES FROM (101) TO (150); + CREATE TABLE sch1.part2_2 PARTITION OF sch1.part2 FOR VALUES FROM (151) TO (200); + ALTER TABLE sch1.t1 ATTACH PARTITION sch1.part2 FOR VALUES FROM (101) TO (200); )); $node_subscriber->safe_psql( @@ -99,142 +102,149 @@ $node_subscriber->safe_psql( CREATE TABLE sch1.t1(a int); CREATE TABLE sch1.part1(a int); CREATE TABLE sch1.part2(a int); + CREATE TABLE sch1.part2_1(a int); + CREATE TABLE sch1.part2_2(a int); )); -# EXCEPT TABLE (sch1.part1) with publish_via_partition_root = false -# Excluding a partition while publish_via_partition_root = false prevents -# replication of rows inserted into the partitioned table for that particular -# partition. +# Excluding the root partitioned table excludes all its partitions as well when +# publish_via_partition_root = false. $node_publisher->safe_psql( 'postgres', qq( - CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part1) WITH (publish_via_partition_root = false); - INSERT INTO sch1.t1 VALUES (1), (6); + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root = false); + INSERT INTO sch1.t1 VALUES (1), (101), (151); )); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" ); $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); $node_publisher->safe_psql('postgres', - "INSERT INTO sch1.t1 VALUES (2), (7);"); + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" +); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (102), (152)"); + +# Verify that data inserted to the partitioned table is not published when it is +# excluded with publish_via_partition_root = true. +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub_part')" +); $node_publisher->wait_for_catchup('tap_sub_part'); +# Check that no rows are replicated to subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); -is($result, qq(), 'check rows on partitioned table'); +is($result, qq(), 'check rows on root table'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); -is($result, qq(), 'check rows on excluded partition'); +is($result, qq(), 'check rows on table sch1.part1'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); -is( $result, qq(6 -7), 'check rows on other partition'); +is($result, qq(), 'check rows on table sch1.part2'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_1"); +is($result, qq(), 'check rows on table sch1.part2_1'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_2"); +is($result, qq(), 'check rows on table sch1.part2_2'); -$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); -$node_publisher->wait_for_catchup('tap_sub_part'); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); -# EXCEPT TABLE (sch1.t1) with publish_via_partition_root = false -# Excluding the partitioned table still allows rows inserted into the -# partitioned table to be replicated via its partitions. +# Excluding the root partitioned table excludes all its partitions as well when +# publish_via_partition_root = true. $node_publisher->safe_psql( 'postgres', qq( - CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root = false); - INSERT INTO sch1.t1 VALUES (1), (6); + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root = true); )); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" ); $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); $node_publisher->safe_psql('postgres', - "INSERT INTO sch1.t1 VALUES (2), (7);"); + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" +); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (3), (103), (153);"); + +# Verify that data inserted to the partitioned table is not published when it is +# excluded with publish_via_partition_root = true. +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub_part')" +); $node_publisher->wait_for_catchup('tap_sub_part'); +# Check that no rows are replicated to subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); -is($result, qq(), 'check rows on partitioned table'); +is($result, qq(), 'check rows on root table'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); -is( $result, qq(1 -2), 'check rows on first partition'); +is($result, qq(), 'check rows on table sch1.part1'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); -is( $result, qq(6 -7), 'check rows on second partition'); +is($result, qq(), 'check rows on table sch1.part2'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_1"); +is($result, qq(), 'check rows on table sch1.part2_1'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_2"); +is($result, qq(), 'check rows on table sch1.part2_2'); -$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); -$node_publisher->wait_for_catchup('tap_sub_part'); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); $node_publisher->safe_psql('postgres', "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" ); -# EXCEPT TABLE (sch1.t1) with publish_via_partition_root = true -# When the partitioned table is excluded and publish_via_partition_root is true, -# no rows from the table or its partitions are replicated. +# Excluding one of the child partition table with +# publish_via_partition_root = true should replicate the other partitions. $node_publisher->safe_psql( 'postgres', qq( - CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root); - INSERT INTO sch1.t1 VALUES (1), (6); + TRUNCATE sch1.t1; + INSERT INTO sch1.t1 VALUES (3), (103), (153); + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part2) WITH (publish_via_partition_root = true); +)); +$node_subscriber->safe_psql( + 'postgres', qq( + TRUNCATE sch1.t1; + CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part; )); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" -); $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); $node_publisher->safe_psql('postgres', - "INSERT INTO sch1.t1 VALUES (2), (7);"); -$node_publisher->wait_for_catchup('tap_sub_part'); + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" +); # Verify that data inserted to the partitioned table is not published when it is # excluded with publish_via_partition_root = true. $result = $node_publisher->safe_psql('postgres', "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub_part')" ); -is($result, qq(t), 'check no changes for excluded table in replication slot'); +$node_publisher->wait_for_catchup('tap_sub_part'); +# Check that table data 103 and 153 which is present in sch1.sch1.part2 should +# not be replicated. $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); -is($result, qq(), 'check rows on partitioned table'); +is($result, qq(3), 'check rows on root table'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); -is($result, qq(), 'check rows on first partition'); +is($result, qq(), 'check rows on table sch1.part1'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); -is($result, qq(), 'check rows on second partition'); - -$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); -$node_publisher->wait_for_catchup('tap_sub_part'); -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); -$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); - -# EXCEPT TABLE (sch1.part1) with publish_via_partition_root = true -# When a partition is excluded but publish_via_partition_root is true, -# rows published through the partitioned table can still be replicated. -$node_publisher->safe_psql( - 'postgres', qq( - CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part1) WITH (publish_via_partition_root); - INSERT INTO sch1.t1 VALUES (1), (6) -)); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" -); -$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); -$node_publisher->safe_psql('postgres', - "INSERT INTO sch1.t1 VALUES (2), (7);"); -$node_publisher->wait_for_catchup('tap_sub_part'); +is($result, qq(), 'check rows on table sch1.part2'); $result = - $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1 ORDER BY a"); -is( $result, qq(1 -2 -6 -7), 'check rows on partitioned table'); + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_1"); +is($result, qq(), 'check rows on table sch1.part2_1'); -$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); -is($result, qq(), 'check rows on excluded partition'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_2"); +is($result, qq(), 'check rows on table sch1.part2_2'); -$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); -is($result, qq(), 'check rows on other partition'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); -$node_subscriber->stop('fast'); $node_publisher->stop('fast'); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9f5ee8fd482..b54fa84a025 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2418,6 +2418,7 @@ QTNode QUERYTYPE QualCost QualItem +QualifiedRelationName Query QueryCompletion QueryDesc -- 2.34.1