Skip to content

Commit def7919

Browse files
feat!: error on surplus columns in output schema (#1488)
## What changes are proposed in this pull request? Fixes [#1248](#1248) by adding field count validation to Struct expression evaluation, preventing silent data loss when the expression produces fewer fields than the output schema expects. The validation exposed a bug (test `test_append_partitioned` in `kernel/tests/write.rs` started failing) in partitioned table writes where the full logical schema (with partition columns) was incorrectly used instead of the physical schema (without partition columns), which is fixed by adding a `physical_schema` field to `WriteContext` that properly excludes partition columns. ## How was this change tested? Added addional test case `test_struct_expression_schema_validation` in `kernel/src/engine/arrow_expression/evaluate_expression.rs::944` --------- Co-authored-by: aleksandarskrbic <aleksandarskrbic@users.noreply.github.com>
1 parent aa23fd0 commit def7919

File tree

4 files changed

+123
-7
lines changed

4 files changed

+123
-7
lines changed

ffi/src/transaction/write_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub unsafe extern "C" fn get_write_schema(
4343
write_context: Handle<SharedWriteContext>,
4444
) -> Handle<SharedSchema> {
4545
let write_context = unsafe { write_context.as_ref() };
46-
write_context.schema().clone().into()
46+
write_context.logical_schema().clone().into()
4747
}
4848

4949
/// Get write path from WriteContext handle.

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ fn evaluate_struct_expression(
102102
batch: &RecordBatch,
103103
output_schema: &StructType,
104104
) -> DeltaResult<ArrayRef> {
105+
if fields.len() != output_schema.num_fields() {
106+
return Err(Error::generic(format!(
107+
"Struct expression field count mismatch: {} fields in expression but {} in schema",
108+
fields.len(),
109+
output_schema.num_fields()
110+
)));
111+
}
112+
105113
let output_cols: Vec<ArrayRef> = fields
106114
.iter()
107115
.zip(output_schema.fields())
@@ -931,6 +939,49 @@ mod tests {
931939
.contains("Data type is required"));
932940
}
933941

942+
#[test]
943+
fn test_struct_expression_schema_validation() {
944+
let batch = create_test_batch();
945+
946+
let test_cases = vec![
947+
(
948+
"too many schema fields",
949+
Expr::Struct(vec![column_expr_ref!("a"), column_expr_ref!("b")]),
950+
StructType::new_unchecked(vec![
951+
StructField::not_null("a", DataType::INTEGER),
952+
StructField::not_null("b", DataType::INTEGER),
953+
StructField::not_null("c", DataType::INTEGER),
954+
]),
955+
),
956+
(
957+
"too few schema fields",
958+
Expr::Struct(vec![
959+
column_expr_ref!("a"),
960+
column_expr_ref!("b"),
961+
column_expr_ref!("c"),
962+
]),
963+
StructType::new_unchecked(vec![
964+
StructField::not_null("a", DataType::INTEGER),
965+
StructField::not_null("b", DataType::INTEGER),
966+
]),
967+
),
968+
];
969+
970+
for (name, expr, schema) in test_cases {
971+
let result =
972+
evaluate_expression(&expr, &batch, Some(&DataType::Struct(Box::new(schema))));
973+
assert!(result.is_err(), "Test case '{}' should fail", name);
974+
assert!(
975+
result
976+
.unwrap_err()
977+
.to_string()
978+
.contains("field count mismatch"),
979+
"Test case '{}' should contain 'field count mismatch' error",
980+
name
981+
);
982+
}
983+
}
984+
934985
#[test]
935986
fn test_coalesce_arrays_same_type() {
936987
// Test with Int32 arrays

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
9999
) -> DeltaResult<Box<dyn EngineData>> {
100100
let transform = write_context.logical_to_physical();
101101
let input_schema = Schema::try_from_arrow(data.record_batch().schema())?;
102-
let output_schema = write_context.schema();
102+
let output_schema = write_context.physical_schema();
103103
let logical_to_physical_expr = self.evaluation_handler().new_expression_evaluator(
104104
input_schema.into(),
105105
transform.clone(),

kernel/src/transaction/mod.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,9 +557,23 @@ impl Transaction {
557557
let target_dir = self.read_snapshot.table_root();
558558
let snapshot_schema = self.read_snapshot.schema();
559559
let logical_to_physical = self.generate_logical_to_physical();
560+
561+
// Compute physical schema: exclude partition columns since they're stored in the path
562+
let partition_columns = self
563+
.read_snapshot
564+
.table_configuration()
565+
.metadata()
566+
.partition_columns();
567+
let physical_fields = snapshot_schema
568+
.fields()
569+
.filter(|f| !partition_columns.contains(f.name()))
570+
.cloned();
571+
let physical_schema = Arc::new(StructType::new_unchecked(physical_fields));
572+
560573
WriteContext::new(
561574
target_dir.clone(),
562575
snapshot_schema,
576+
physical_schema,
563577
Arc::new(logical_to_physical),
564578
)
565579
}
@@ -874,15 +888,22 @@ impl Transaction {
874888
/// [`Transaction`]: struct.Transaction.html
875889
pub struct WriteContext {
876890
target_dir: Url,
877-
schema: SchemaRef,
891+
logical_schema: SchemaRef,
892+
physical_schema: SchemaRef,
878893
logical_to_physical: ExpressionRef,
879894
}
880895

881896
impl WriteContext {
882-
fn new(target_dir: Url, schema: SchemaRef, logical_to_physical: ExpressionRef) -> Self {
897+
fn new(
898+
target_dir: Url,
899+
logical_schema: SchemaRef,
900+
physical_schema: SchemaRef,
901+
logical_to_physical: ExpressionRef,
902+
) -> Self {
883903
WriteContext {
884904
target_dir,
885-
schema,
905+
logical_schema,
906+
physical_schema,
886907
logical_to_physical,
887908
}
888909
}
@@ -891,8 +912,12 @@ impl WriteContext {
891912
&self.target_dir
892913
}
893914

894-
pub fn schema(&self) -> &SchemaRef {
895-
&self.schema
915+
pub fn logical_schema(&self) -> &SchemaRef {
916+
&self.logical_schema
917+
}
918+
919+
pub fn physical_schema(&self) -> &SchemaRef {
920+
&self.physical_schema
896921
}
897922

898923
pub fn logical_to_physical(&self) -> ExpressionRef {
@@ -1109,4 +1134,44 @@ mod tests {
11091134

11101135
Ok(())
11111136
}
1137+
1138+
#[test]
1139+
fn test_physical_schema_excludes_partition_columns() -> Result<(), Box<dyn std::error::Error>> {
1140+
let engine = SyncEngine::new();
1141+
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
1142+
let url = url::Url::from_directory_path(path).unwrap();
1143+
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1144+
let txn = snapshot
1145+
.transaction(Box::new(FileSystemCommitter::new()))?
1146+
.with_engine_info("default engine");
1147+
1148+
let write_context = txn.get_write_context();
1149+
let logical_schema = write_context.logical_schema();
1150+
let physical_schema = write_context.physical_schema();
1151+
1152+
// Logical schema should include the partition column
1153+
assert!(
1154+
logical_schema.contains("letter"),
1155+
"Logical schema should contain partition column 'letter'"
1156+
);
1157+
1158+
// Physical schema should exclude the partition column
1159+
assert!(
1160+
!physical_schema.contains("letter"),
1161+
"Physical schema should not contain partition column 'letter' (stored in path)"
1162+
);
1163+
1164+
// Both should contain the non-partition columns
1165+
assert!(
1166+
logical_schema.contains("number"),
1167+
"Logical schema should contain data column 'number'"
1168+
);
1169+
1170+
assert!(
1171+
physical_schema.contains("number"),
1172+
"Physical schema should contain data column 'number'"
1173+
);
1174+
1175+
Ok(())
1176+
}
11121177
}

0 commit comments

Comments
 (0)