From 68403578523c96943c3616dbbdb2a82ad8863244 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Tue, 17 Jun 2025 12:12:24 +0530 Subject: [PATCH v12 3/3] Skip publishing the columns specified in FOR TABLE EXCEPT This patch introduces syntax to exclude columns of tables from publication. Syntax: FOR TABLE tabname EXCEPT (column_list) It can be used with CREATE/ ALTER PUBLICATION. Eg: CREATE PUBLICATION pubname FOR TABLE tabname EXCEPT (exclude_column_list) ALTER PUBLICATION pubname ADD TABLE tabname EXCEPT (exclude_column_list) --- doc/src/sgml/catalogs.sgml | 14 ++ doc/src/sgml/ref/alter_publication.sgml | 10 +- doc/src/sgml/ref/create_publication.sgml | 18 ++- src/backend/catalog/pg_publication.c | 135 +++++++++++++++++- src/backend/commands/publicationcmds.c | 73 +++++++++- src/backend/parser/gram.y | 60 ++++++++ src/backend/replication/pgoutput/pgoutput.c | 47 +++++- src/bin/pg_dump/pg_dump.c | 39 ++++- src/bin/pg_dump/pg_dump.h | 1 + src/include/catalog/pg_publication.h | 9 +- src/include/catalog/pg_publication_rel.h | 1 + src/include/nodes/parsenodes.h | 1 + src/test/regress/expected/publication.out | 65 +++++++++ src/test/regress/sql/publication.sql | 45 ++++++ .../t/036_rep_changes_except_table.pl | 60 +++++++- 15 files changed, 560 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 4e37c928b44..544998a1725 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6589,6 +6589,20 @@ SCRAM-SHA-256$<iteration count>:&l A null value indicates that all columns are published. + + + + prexcludeattrs int2vector + (references pg_attribute.attnum) + + + This is an array of values that indicates which table columns are + excluded from the publication. For example, a value of + 1 3 would mean that the columns except the first and + the third columns are published. + A null value indicates that no columns are excluded from being published. + + diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 37e2c84bc10..70b04fc7320 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -32,7 +32,7 @@ ALTER PUBLICATION name RESET where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] { [ [ ( column_name [, ... ] ) ] | [ EXCEPT ( column_name [, ... ] ) ] ] } [ WHERE ( expression ) ] [, ... ] TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] where exception_object is: @@ -260,6 +260,14 @@ ALTER PUBLICATION production_publication ADD ALL TABLES EXCEPT users, department production_publication: ALTER PUBLICATION production_publication ADD TABLE users, departments, TABLES IN SCHEMA production; + + + + Alter publication mypublication to add table + users except column + security_pin: + +ALTER PUBLICATION production_publication ADD TABLE users EXCEPT (security_pin); diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 7fd8872db5f..bb44a20b28d 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] { [ [ ( column_name [, ... ] ) ] | [ EXCEPT ( column_name [, ... ] ) ] ] } [ WHERE ( expression ) ] [, ... ] TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] where exception_object is: @@ -103,6 +103,14 @@ CREATE PUBLICATION name lists. + + When a column list is specified with EXCEPT, the named columns are not + replicated. The excluded column list cannot contain generated columns. The + column list and excluded column list cannot be specified together. + Specifying a column list has no effect on TRUNCATE + commands. + + Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, @@ -474,6 +482,14 @@ CREATE PUBLICATION mypublication FOR ALL TABLES EXCEPT users, departments; CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname); + + + Create a publication that publishes all changes for table users + except changes for columns security_pin: + +CREATE PUBLICATION users_safe FOR TABLE users EXCEPT (security_pin); + + diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ec580e3b050..8fd9ac84451 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -302,6 +302,53 @@ check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, return found; } +/* + * Returns true if the relation has exluded column list associated with the + * publication, false otherwise. + * + * If a exclude column list is found, the corresponding bitmap is returned + * through the cols parameter, if provided. The bitmap is constructed within the + * given memory context (mcxt). + */ + +bool +check_and_fetch_exclude_column_list(Publication *pub, Oid relid, MemoryContext mcxt, + Bitmapset **cols) +{ + HeapTuple cftuple; + bool found = false; + + if (pub->alltables) + return false; + + cftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(cftuple)) + { + Datum cfdatum; + bool isnull; + + /* Lookup the column list attribute. */ + cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, + Anum_pg_publication_rel_prexcludeattrs, &isnull); + + /* Was a column list found? */ + if (!isnull) + { + /* Build the column list bitmap in the given memory context. */ + if (cols) + *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt); + + found = true; + } + + ReleaseSysCache(cftuple); + } + + return found; +} + /* * Gets the relations based on the publication partition option for a specified * relation. @@ -449,6 +496,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Bitmapset *attnums; + Bitmapset *excludeattnums; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; @@ -481,6 +529,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, /* Validate and translate column names into a Bitmapset of attnums. */ attnums = pub_collist_validate(pri->relation, pri->columns); + /* + * Validate and translate excluded column names into a Bitmapset of + * attnums. + */ + excludeattnums = pub_exclude_collist_validate(pri->relation, + pri->exclude_columns); + /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -507,6 +562,11 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, else nulls[Anum_pg_publication_rel_prattrs - 1] = true; + if (pri->exclude_columns) + values[Anum_pg_publication_rel_prexcludeattrs - 1] = PointerGetDatum(attnumstoint2vector(excludeattnums)); + else + nulls[Anum_pg_publication_rel_prexcludeattrs - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -609,6 +669,58 @@ pub_collist_validate(Relation targetrel, List *columns) return set; } +/* + * pub_exclude_collist_validate + * Process and validate the 'excluded columns' list and ensure the columns + * are all valid to exclude from publication. Checks for and raises an + * ERROR for any unknown columns, system columns, duplicate columns, or + * generated columns. + * + * Looks up each column's attnum and returns a 0-based Bitmapset of the + * corresponding attnums. + */ +Bitmapset * +pub_exclude_collist_validate(Relation targetrel, List *exclude_columns) +{ + Bitmapset *set = NULL; + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(targetrel); + + foreach(lc, exclude_columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use system column \"%s\" in publication except column list", + colname)); + + if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use generated column \"%s\" in publication except column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication except column list", + colname)); + + set = bms_add_member(set, attnum); + } + + return set; +} + /* * Transform a column list (represented by an array Datum) to a bitmapset. * @@ -646,10 +758,12 @@ pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) * Returns a bitmap representing the columns of the specified table. * * Generated columns are included if include_gencols_type is - * PUBLISH_GENCOLS_STORED. + * PUBLISH_GENCOLS_STORED. Columns that are in the excludecols are excluded from + * the column list. */ Bitmapset * -pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type) +pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type, + Bitmapset *excludecols) { Bitmapset *result = NULL; TupleDesc desc = RelationGetDescr(relation); @@ -672,6 +786,9 @@ pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type) continue; } + if (excludecols && bms_is_member(att->attnum, excludecols)) + continue; + result = bms_add_member(result, att->attnum); } @@ -1263,6 +1380,9 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) Oid schemaid = get_rel_namespace(relid); Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; + Datum excludeattnums_datum; + Bitmapset *excludeattnums = NULL; + bool isnull; /* * Form tuple with appropriate data. @@ -1296,6 +1416,13 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prqual, &(nulls[3])); + + /* get the excluded column list */ + excludeattnums_datum = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + Anum_pg_publication_rel_prexcludeattrs, + &isnull); + if (!isnull) + excludeattnums = pub_collist_to_bitmapset(NULL, excludeattnums_datum, NULL); } else { @@ -1335,6 +1462,10 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) continue; } + /* Skip columns that are part of excluded column list */ + if (excludeattnums && bms_is_member(att->attnum, excludeattnums)) + continue; + attnums[nattnums++] = att->attnum; } diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 5194b2fb6e2..e850c2345ea 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -358,7 +358,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, * This function evaluates two conditions: * * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered - * by the column list. If any column is missing, *invalid_column_list is set + * by the column list and not part of excluded column list. If any column is + * missing or is part of exclude column list, *invalid_column_list is set * to true. * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY * are published, either by being explicitly named in the column list or, if @@ -378,6 +379,7 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, Oid publish_as_relid = RelationGetRelid(relation); Bitmapset *idattrs; Bitmapset *columns = NULL; + Bitmapset *exclude_columns = NULL; TupleDesc desc = RelationGetDescr(relation); Publication *pub; int x; @@ -405,11 +407,15 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, /* Fetch the column list */ pub = GetPublication(pubid); check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns); + check_and_fetch_exclude_column_list(pub, publish_as_relid, NULL, &exclude_columns); if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) { - /* With REPLICA IDENTITY FULL, no column list is allowed. */ - *invalid_column_list = (columns != NULL); + /* + * With REPLICA IDENTITY FULL, no column list and no excluded column + * list is allowed. + */ + *invalid_column_list = (columns != NULL || exclude_columns != NULL); /* * As we don't allow a column list with REPLICA IDENTITY FULL, the @@ -471,6 +477,16 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, break; } + /* + * If REPLICA IDENTITY should not contain columns which are + * excluded from the publication. + */ + if (exclude_columns && bms_is_member(att->attnum, exclude_columns)) + { + *invalid_column_list = true; + break; + } + /* Skip validating the column list since it is not defined */ continue; } @@ -798,7 +814,7 @@ CheckPubRelationColumnList(char *pubname, List *tables, { PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); - if (pri->columns == NIL) + if (pri->columns == NIL && pri->exclude_columns == NIL) continue; /* @@ -1043,6 +1059,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, char *relname; bool has_rowfilter; bool has_collist; + bool has_exclude_collist; /* * Beware: we don't have lock on the relations, so cope silently @@ -1056,7 +1073,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, continue; has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); - if (!has_rowfilter && !has_collist) + has_exclude_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prexcludeattrs, NULL); + + if (!has_rowfilter && !has_collist && !has_exclude_collist) { ReleaseSysCache(rftuple); continue; @@ -1083,6 +1102,14 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, stmt->pubname), errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.", relname, "publish_via_partition_root"))); + if (has_exclude_collist) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set parameter \"%s\" to false for publication \"%s\"", + "publish_via_partition_root", + stmt->pubname), + errdetail("The publication contains a except column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.", + relname, "publish_via_partition_root"))); Assert(has_collist); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1443,6 +1470,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, HeapTuple rftuple; Node *oldrelwhereclause = NULL; Bitmapset *oldcolumns = NULL; + Bitmapset *oldexcludecolumns = NULL; /* look up the cache for the old relmap */ rftuple = SearchSysCache2(PUBLICATIONRELMAP, @@ -1458,6 +1486,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, bool isnull = true; Datum whereClauseDatum; Datum columnListDatum; + Datum excludeColumnListDatum; /* Load the WHERE clause for this table. */ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, @@ -1474,6 +1503,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!isnull) oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); + /* Transform the int2vector exclude column list to a bitmap. */ + excludeColumnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prexcludeattrs, + &isnull); + + if (!isnull) + oldexcludecolumns = pub_collist_to_bitmapset(NULL, excludeColumnListDatum, NULL); + ReleaseSysCache(rftuple); } @@ -1482,6 +1519,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationRelInfo *newpubrel; Oid newrelid; Bitmapset *newcolumns = NULL; + Bitmapset *newexcludecolumns = NULL; newpubrel = (PublicationRelInfo *) lfirst(newlc); newrelid = RelationGetRelid(newpubrel->relation); @@ -1495,6 +1533,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, newcolumns = pub_collist_validate(newpubrel->relation, newpubrel->columns); + newexcludecolumns = pub_collist_validate(newpubrel->relation, + newpubrel->exclude_columns); + /* * Check if any of the new set of relations matches with the * existing relations in the publication. Additionally, if the @@ -1505,7 +1546,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (newrelid == oldrelid) { if (equal(oldrelwhereclause, newpubrel->whereClause) && - bms_equal(oldcolumns, newcolumns)) + bms_equal(oldcolumns, newcolumns) && + bms_equal(oldexcludecolumns, newexcludecolumns)) { found = true; break; @@ -1522,6 +1564,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; oldrel->columns = NIL; + oldrel->exclude_columns = NIL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1596,6 +1639,17 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, stmt->pubname), errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication.")); + /* + * Disallow adding schema if exclude column list is already part + * of the publication. See CheckPubRelationColumnList. + */ + if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prexcludeattrs, NULL)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema to publication \"%s\"", + stmt->pubname), + errdetail("Schemas cannot be added if any tables that specify an except column list are already part of the publication.")); + ReleaseSysCache(coltuple); } @@ -1922,6 +1976,7 @@ OpenTableList(List *tables) pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; pub_rel->except = t->except; + pub_rel->exclude_columns = t->exclude_columns; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -1995,6 +2050,7 @@ OpenTableList(List *tables) /* child inherits column list from parent */ pub_rel->columns = t->columns; pub_rel->except = t->except; + pub_rel->exclude_columns = t->exclude_columns; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); @@ -2114,6 +2170,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) errcode(ERRCODE_SYNTAX_ERROR), errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + if (pubrel->exclude_columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("except column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 89ac0495ce8..1be4298bce7 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -447,6 +447,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list drop_option_list pub_obj_list except_pub_obj_list + opt_exclude_column_list %type returning_clause %type returning_option @@ -4415,6 +4416,10 @@ opt_column_list: | /*EMPTY*/ { $$ = NIL; } ; +opt_exclude_column_list: + '(' columnList ')' { $$ = $2; } + ; + columnList: columnElem { $$ = list_make1($1); } | columnList ',' columnElem { $$ = lappend($1, $3); } @@ -10681,6 +10686,15 @@ PublicationObjSpec: $$->pubtable->whereClause = $4; $$->location = @1; } + | TABLE relation_expr EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; + $$->pubtable->exclude_columns = $4; + $$->pubtable->whereClause = $5; + } | TABLES IN_P SCHEMA ColId { $$ = makeNode(PublicationObjSpec); @@ -10721,6 +10735,33 @@ PublicationObjSpec: } $$->location = @1; } + | ColId EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + /* + * If either a row filter or exclude column list is + * specified, create a PublicationTable object. + */ + if ($3 || $4) + { + /* + * The OptWhereClause must be stored here but it is + * valid only for tables. For non-table objects, an + * error will be thrown later via + * preprocess_pubobj_list(). + */ + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->exclude_columns = $3; + $$->pubtable->whereClause = $4; + } + else + { + $$->name = $1; + } + $$->location = @1; + } | ColId indirection opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); @@ -10731,6 +10772,16 @@ PublicationObjSpec: $$->pubtable->whereClause = $4; $$->location = @1; } + | ColId indirection EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->exclude_columns = $4; + $$->pubtable->whereClause = $5; + $$->location = @1; + } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ | extended_relation_expr opt_column_list OptWhereClause { @@ -10741,6 +10792,15 @@ PublicationObjSpec: $$->pubtable->columns = $2; $$->pubtable->whereClause = $3; } + | extended_relation_expr EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $1; + $$->pubtable->exclude_columns = $3; + $$->pubtable->whereClause = $4; + } | CURRENT_SCHEMA { $$ = makeNode(PublicationObjSpec); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5512b4cba7f..f36c361abd5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -185,6 +185,9 @@ typedef struct RelationSyncEntry * row filter expressions, column list, etc. */ MemoryContext entry_cxt; + + /* Indicate if no column is included in the publication */ + bool no_cols_published; } RelationSyncEntry; /* @@ -1099,6 +1102,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, bool first = true; Relation relation = RelationIdGetRelation(entry->publish_as_relid); bool found_pub_collist = false; + bool found_pub_exclude_collist = false; Bitmapset *relcols = NULL; pgoutput_ensure_entry_cxt(data, entry); @@ -1120,12 +1124,32 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { Publication *pub = lfirst(lc); Bitmapset *cols = NULL; + Bitmapset *excludecols = NULL; /* Retrieve the bitmap of columns for a column list publication. */ found_pub_collist |= check_and_fetch_column_list(pub, entry->publish_as_relid, entry->entry_cxt, &cols); + /* Retrieve the bitmap of exclude columns for the publication. */ + found_pub_exclude_collist |= check_and_fetch_exclude_column_list(pub, + entry->publish_as_relid, + entry->entry_cxt, &excludecols); + + /* + * cols and exclude cols can't appear together. Syntax for it is not + * supported. If column list is not present check for excluded column + * list and construct a corresponding column list. + */ + if (!cols && found_pub_exclude_collist) + { + MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); + + cols = pub_form_cols_map(relation, + entry->include_gencols_type, excludecols); + MemoryContextSwitchTo(oldcxt); + } + /* * For non-column list publications — e.g. TABLE (without a column * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns @@ -1144,7 +1168,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); relcols = pub_form_cols_map(relation, - entry->include_gencols_type); + entry->include_gencols_type, NULL); MemoryContextSwitchTo(oldcxt); } @@ -1155,8 +1179,11 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { entry->columns = cols; first = false; + + if (excludecols && !cols) + entry->no_cols_published = true; } - else if (!bms_equal(entry->columns, cols)) + else if ((entry->no_cols_published && cols) || !bms_equal(entry->columns, cols)) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", @@ -1165,10 +1192,11 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, } /* loop all subscribed publications */ /* - * If no column list publications exist, columns to be published will be - * computed later according to the 'publish_generated_columns' parameter. + * If no column list or excluded column list publications exist, columns + * to be published will be computed later according to the + * 'publish_generated_columns' parameter. */ - if (!found_pub_collist) + if (!found_pub_collist && !found_pub_exclude_collist) entry->columns = NULL; RelationClose(relation); @@ -1480,6 +1508,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry = get_rel_sync_entry(data, relation); + /* + * If all columns of a table is present in the exclude column list. Skip + * publishing the changes. + */ + if (relentry->no_cols_published) + return; + /* First check the table filter */ switch (action) { @@ -2057,6 +2092,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->publish_as_relid = InvalidOid; entry->columns = NULL; entry->attrmap = NULL; + entry->no_cols_published = false; } /* Validate the entry */ @@ -2106,6 +2142,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->no_cols_published = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 92db5ca8d97..29364603130 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4706,6 +4706,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prrelqual; int i_prattrs; int i_prexcept; + int i_prexcludeattrs; int i, j, ntups; @@ -4723,7 +4724,15 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* FIXME: 180000 should be changed to 190000 later for PG19. */ if (fout->remoteVersion >= 180000) - appendPQExpBufferStr(query, " prexcept,\n"); + appendPQExpBufferStr(query, " prexcept, " + "(CASE\n" + " WHEN pr.prexcludeattrs IS NOT NULL THEN\n" + " (SELECT array_agg(attname)\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prexcludeattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prexcludeattrs[s])\n" + " ELSE NULL END) prexcludeattrs, \n"); else appendPQExpBufferStr(query, " false AS prexcept,\n"); @@ -4755,6 +4764,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prrelqual = PQfnumber(res, "prrelqual"); i_prattrs = PQfnumber(res, "prattrs"); i_prexcept = PQfnumber(res, "prexcept"); + i_prexcludeattrs = PQfnumber(res, "prexcludeattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4822,6 +4832,30 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) else pubrinfo[j].pubrattrs = NULL; + if (!PQgetisnull(res, i, i_prexcludeattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer excludeattribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prexcludeattrs), + &attnames, &nattnames)) + pg_fatal("could not parse %s array", "prattrs"); + excludeattribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(excludeattribs, ", "); + + appendPQExpBufferStr(excludeattribs, fmtId(attnames[k])); + } + pubrinfo[j].pubrexcludeattrs = excludeattribs->data; + free(excludeattribs); /* but not excludeattribs->data */ + free(attnames); + } + else + pubrinfo[j].pubrexcludeattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4907,6 +4941,9 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) if (pubrinfo->pubrattrs) appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + if (pubrinfo->pubrexcludeattrs) + appendPQExpBuffer(query, " EXCEPT (%s)", pubrinfo->pubrexcludeattrs); + if (pubrinfo->pubrelqual) { /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 096f29346d8..e01c2d1afbd 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -681,6 +681,7 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; char *pubrelqual; char *pubrattrs; + char *pubrexcludeattrs; } PublicationRelInfo; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 33b771990bd..5344559c88e 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -140,6 +140,7 @@ typedef struct PublicationRelInfo Node *whereClause; List *columns; bool except; + List *exclude_columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -181,15 +182,21 @@ extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); extern bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols); +extern bool check_and_fetch_exclude_column_list(Publication *pub, Oid relid, + MemoryContext mcxt, + Bitmapset **cols); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists); extern Bitmapset *pub_collist_validate(Relation targetrel, List *columns); +extern Bitmapset *pub_exclude_collist_validate(Relation targetrel, + List *exclude_columns); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists); extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt); extern Bitmapset *pub_form_cols_map(Relation relation, - PublishGencolsType include_gencols_type); + PublishGencolsType include_gencols_type, + Bitmapset *excludecols); #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index e7d7f3ba85c..4c1b4ddbddc 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -36,6 +36,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ int2vector prattrs; /* columns to replicate */ + int2vector prexcludeattrs; /* columns to exclude */ #endif } FormData_pg_publication_rel; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 61a0b2ccf38..f148c8e2323 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4236,6 +4236,7 @@ typedef struct PublicationTable Node *whereClause; /* qualifications */ List *columns; /* List of columns in a publication table */ bool except; /* exclude the relation */ + List *exclude_columns; /* List of columns to be excluded */ } PublicationTable; /* diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 5d025328704..a274b3cff31 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -2125,6 +2125,71 @@ SET ROLE regress_publication_user; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; DROP TABLE pub_sch1.tbl2; +-- ====================================================== +-- Test EXCEPT columns for CREATE PUBLICATION +SET client_min_messages = 'ERROR'; +CREATE TABLE pub_test_except1 (a int NOT NULL, b int, c int NOT NULL, d int); +CREATE TABLE pub_sch1.pub_test_except2 (a int, b int, c int, d int); +CREATE TABLE pub_test_except3 (a int, gen1 int GENERATED ALWAYS AS (a * 2) STORED); +-- Verify that publication is created with EXCEPT +CREATE PUBLICATION testpub_except FOR TABLE pub_test_except1, pub_sch1.pub_test_except2 EXCEPT (b, c); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {a,b,c,d} | + testpub_except | pub_sch1 | pub_test_except2 | {a,d} | +(2 rows) + +-- Check for invalid cases +CREATE PUBLICATION testpub_except2 FOR TABLES IN SCHEMA pub_sch1, TABLE pub_test_except1 EXCEPT (b, c); +ERROR: cannot use column list for relation "public.pub_test_except1" in publication "testpub_except2" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; +ERROR: syntax error at or near ";" +LINE 1: ...BLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; + ^ +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except3 EXCEPT (gen1); +ERROR: cannot use generated column "gen1" in publication except column list +-- Verify that publication can be altered with EXCEPT +ALTER PUBLICATION testpub_except SET TABLE pub_test_except1 EXCEPT (a, b), pub_sch1.pub_test_except2; +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {c,d} | + testpub_except | pub_sch1 | pub_test_except2 | {a,b,c,d} | +(2 rows) + +-- Verify ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1 EXCEPT (a, b); +ERROR: except column list must not be specified in ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1; +ALTER PUBLICATION testpub_except ADD TABLE pub_test_except1 EXCEPT (c, d); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {a,b} | + testpub_except | pub_sch1 | pub_test_except2 | {a,b,c,d} | +(2 rows) + +-- Verify excluded columns cannot be part of REPLICA IDENTITY +ALTER TABLE pub_test_except1 REPLICA IDENTITY FULL; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +ERROR: cannot update table "pub_test_except1" +DETAIL: Column list used by the publication does not cover the replica identity. +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a, c); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +ERROR: cannot update table "pub_test_except1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP INDEX pub_test_except1_a_idx; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +DROP INDEX pub_test_except1_a_idx; +DROP PUBLICATION testpub_except; +DROP TABLE pub_test_except1; +DROP TABLE pub_sch1.pub_test_except2; +DROP TABLE pub_test_except3; DROP SCHEMA pub_sch1; RESET client_min_messages; RESET SESSION AUTHORIZATION; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index af31a2214ca..6b23f215739 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -1318,6 +1318,51 @@ SET ROLE regress_publication_user; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; DROP TABLE pub_sch1.tbl2; + +-- ====================================================== +-- Test EXCEPT columns for CREATE PUBLICATION + +SET client_min_messages = 'ERROR'; +CREATE TABLE pub_test_except1 (a int NOT NULL, b int, c int NOT NULL, d int); +CREATE TABLE pub_sch1.pub_test_except2 (a int, b int, c int, d int); +CREATE TABLE pub_test_except3 (a int, gen1 int GENERATED ALWAYS AS (a * 2) STORED); + +-- Verify that publication is created with EXCEPT +CREATE PUBLICATION testpub_except FOR TABLE pub_test_except1, pub_sch1.pub_test_except2 EXCEPT (b, c); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Check for invalid cases +CREATE PUBLICATION testpub_except2 FOR TABLES IN SCHEMA pub_sch1, TABLE pub_test_except1 EXCEPT (b, c); +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except3 EXCEPT (gen1); + +-- Verify that publication can be altered with EXCEPT +ALTER PUBLICATION testpub_except SET TABLE pub_test_except1 EXCEPT (a, b), pub_sch1.pub_test_except2; +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Verify ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1 EXCEPT (a, b); +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1; + +ALTER PUBLICATION testpub_except ADD TABLE pub_test_except1 EXCEPT (c, d); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Verify excluded columns cannot be part of REPLICA IDENTITY +ALTER TABLE pub_test_except1 REPLICA IDENTITY FULL; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a, c); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +DROP INDEX pub_test_except1_a_idx; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; + +DROP INDEX pub_test_except1_a_idx; +DROP PUBLICATION testpub_except; +DROP TABLE pub_test_except1; +DROP TABLE pub_sch1.pub_test_except2; +DROP TABLE pub_test_except3; DROP SCHEMA pub_sch1; RESET client_min_messages; diff --git a/src/test/subscription/t/036_rep_changes_except_table.pl b/src/test/subscription/t/036_rep_changes_except_table.pl index 1d115283809..ec77f2e8d04 100644 --- a/src/test/subscription/t/036_rep_changes_except_table.pl +++ b/src/test/subscription/t/036_rep_changes_except_table.pl @@ -1,7 +1,7 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Logical replication tests for except table publications +# Logical replication tests for except table and except column publications use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -77,6 +77,64 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM public.tab1"); is($result, qq(0||), 'check rows on subscriber catchup'); +# Test for except column publications +# Initial setup +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int, b int NOT NULL, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE sch1.tab2 (a int, b int, c int)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab2 VALUES (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_col FOR TABLE tab2 EXCEPT (a), sch1.tab2 EXCEPT (b, c)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int, b int NOT NULL, c int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE sch1.tab2 (a int, b int, c int)"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_col CONNECTION '$publisher_connstr' PUBLICATION tap_pub_col" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_col'); + +# Test initial sync +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is($result, qq(|2|3), + 'check that initial sync for except column publication'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.tab2"); +is($result, qq(1||), 'check that initial sync for except column publication'); +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (4, 5, 6)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab2 VALUES (4, 5, 6)"); +$node_publisher->wait_for_catchup('tap_sub_col'); + +# Test incremental changes +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is( $result, qq(|2|3 +|5|6), + 'check incremental insert for except column publication'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.tab2"); +is( $result, qq(1|| +4||), 'check incremental insert for except column publication'); + +# Test for update +$node_publisher->safe_psql('postgres', + "CREATE UNIQUE INDEX b_idx ON tab2 (b)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 REPLICA IDENTITY USING INDEX b_idx"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX b_idx ON tab2 (b)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab2 REPLICA IDENTITY USING INDEX b_idx"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 3, b = 4, c = 5 WHERE a = 1"); +$node_publisher->wait_for_catchup('tap_sub_col'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is( $result, qq(|5|6 +|4|5), + 'check update for except column publication'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.34.1