Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ impl PgReplicationClient {
/// Retrieves schema information for all columns in a table.
///
/// If a publication is specified, only columns included in that publication
/// will be returned.
/// will be returned. Generated columns are always excluded since they are not
/// supported in PostgreSQL logical replication.
async fn get_column_schemas(
&self,
table_id: TableId,
Expand Down Expand Up @@ -831,6 +832,19 @@ impl PgReplicationClient {
join direct_parent dp on con.conrelid = dp.parent_oid
where con.contype = 'p'
group by con.conname
),
-- Identify generated columns for warning purposes
generated_cols as (
select
c.relname as table_name,
string_agg(a.attname, ', ' order by a.attnum) as generated_column_names
from pg_class c
join pg_attribute a on a.attrelid = c.oid
where c.oid = {table_id}
and a.attnum > 0
and not a.attisdropped
and a.attgenerated != ''
group by c.relname
)
select
a.attname,
Expand All @@ -847,7 +861,9 @@ impl PgReplicationClient {
where a.attname = any(pk.pk_column_names)
) then true
else false
end as primary
end as primary,
(select table_name from generated_cols) as gen_table_name,
(select generated_column_names from generated_cols) as gen_columns
from pg_attribute a
left join pg_index i
on a.attrelid = i.indrelid
Expand All @@ -865,6 +881,8 @@ impl PgReplicationClient {
);

let mut column_schemas = vec![];
let mut warned_about_generated = false;

for message in self.client.simple_query(&column_info_query).await? {
if let SimpleQueryMessage::Row(row) = message {
let name = Self::get_row_value::<String>(&row, "attname", "pg_attribute").await?;
Expand All @@ -876,6 +894,20 @@ impl PgReplicationClient {
let primary =
Self::get_row_value::<String>(&row, "primary", "pg_index").await? == "t";

// Check for generated columns and warn once per table
if !warned_about_generated
&& let Some(gen_columns) = row.try_get("gen_columns")?
&& let Some(table_name) = row.try_get("gen_table_name")?
{
warn!(
"Table '{}' contains generated columns ({}) that will NOT be replicated. \
Generated columns are not supported in PostgreSQL logical replication and will \
be excluded from the ETL schema. These columns will NOT appear in the destination.",
table_name, gen_columns
);
warned_about_generated = true;
}

let typ = convert_type_oid_to_type(type_oid);

column_schemas.push(ColumnSchema {
Expand Down