diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 7c8f76887..76e2ee525 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -803,7 +803,8 @@ impl PgReplicationClient { /// Retrieves schema information for all columns in a table. /// /// If a publication is specified, only columns included in that publication - /// will be returned. + /// will be returned. Generated columns are always excluded since they are not + /// supported in PostgreSQL logical replication. async fn get_column_schemas( &self, table_id: TableId, @@ -864,6 +865,40 @@ impl PgReplicationClient { publication_predicate = publication_filter.predicate, ); + // Check for generated columns so we can warn if there are any. + let generated_columns_check_query = format!( + r#"select exists ( + select 1 + from pg_attribute + where attrelid = {table_id} + and attnum > 0 + and not attisdropped + and attgenerated != '' + ) as has_generated;"# + ); + + for message in self + .client + .simple_query(&generated_columns_check_query) + .await? + { + if let SimpleQueryMessage::Row(row) = message { + let has_generated_columns = + Self::get_row_value::(&row, "has_generated", "pg_attribute").await? + == "t"; + if has_generated_columns { + warn!( + "Table {} contains generated columns that will NOT be replicated. \ + Generated columns are not supported in PostgreSQL logical replication and will \ + be excluded from the ETL schema. These columns will NOT appear in the destination.", + table_id + ); + } + // Explicity break for clarity; this query returns a single SimpleQueryMessage::Row. + break; + } + } + let mut column_schemas = vec![]; for message in self.client.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message {