Skip to content

Commit a50a244

Browse files
committed
Improve
1 parent 2de1c17 commit a50a244

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

etl-destinations/src/iceberg/schema.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,38 +83,50 @@ fn create_iceberg_list_type(element_type: PrimitiveType, field_id: i32) -> Icebe
8383
/// Primary key columns are converted to Iceberg identifier fields. This enables
8484
/// Iceberg to understand which columns uniquely identify rows in the table.
8585
/// Iceberg identifier fields are unordered (stored as a set).
86+
///
87+
/// Field IDs are assigned following iceberg-rust's convention: all outer field IDs
88+
/// are assigned first, then nested field IDs (e.g., list element fields). This ensures
89+
/// consistency with how the Iceberg library handles schema evolution.
8690
pub fn postgres_to_iceberg_schema(
8791
column_schemas: &[ColumnSchema],
8892
) -> Result<IcebergSchema, iceberg::Error> {
89-
let mut fields = Vec::new();
9093
let mut identifier_field_ids = Vec::new();
91-
let mut field_id = 1;
9294

93-
// Convert each column to Iceberg field.
94-
for column_schema in column_schemas {
95-
let current_field_id = field_id;
95+
// First pass: assign IDs to all outer fields (1, 2, 3, ...).
96+
let mut outer_field_id = 1;
97+
let outer_fields: Vec<_> = column_schemas
98+
.iter()
99+
.map(|col| {
100+
let id = outer_field_id;
101+
outer_field_id += 1;
102+
(col, id)
103+
})
104+
.collect();
105+
106+
// Second pass: assign IDs to nested fields (list elements) and build the schema.
107+
// Nested field IDs start after all outer field IDs.
108+
let mut nested_field_id = outer_field_id;
109+
let mut fields = Vec::new();
96110

111+
for (column_schema, field_id) in outer_fields {
97112
let field_type = if is_array_type(&column_schema.typ) {
98-
// For array types, we need to assign a unique field ID to the list element.
99-
// The list field uses current_field_id, the element field uses the next ID.
100-
field_id += 1;
101-
postgres_array_type_to_iceberg_type(&column_schema.typ, field_id)
113+
let element_id = nested_field_id;
114+
nested_field_id += 1;
115+
postgres_array_type_to_iceberg_type(&column_schema.typ, element_id)
102116
} else {
103117
postgres_scalar_type_to_iceberg_type(&column_schema.typ)
104118
};
105119

106120
let field = if column_schema.nullable {
107-
NestedField::optional(current_field_id, &column_schema.name, field_type)
121+
NestedField::optional(field_id, &column_schema.name, field_type)
108122
} else {
109-
NestedField::required(current_field_id, &column_schema.name, field_type)
123+
NestedField::required(field_id, &column_schema.name, field_type)
110124
};
111125
fields.push(Arc::new(field));
112126

113127
if column_schema.primary_key() {
114-
identifier_field_ids.push(current_field_id);
128+
identifier_field_ids.push(field_id);
115129
}
116-
117-
field_id += 1;
118130
}
119131

120132
let mut builder = IcebergSchema::builder().with_fields(fields);

0 commit comments

Comments
 (0)