Skip to content

Commit 4e536f9

Browse files
feat: enable CDF for column-mapped tables (#1510)
## What changes are proposed in this pull request? enable CDF (previously blocked due to some table configuration checks) on column-mapped tables. After migrating to use transforms in CDF implementation, all the pieces are there - just need to ungate + test it. additionally, adds ability to read CDF for the following table features: `V2Checkpoint`, `VacuumProtocolCheck`, and `TimestampWithoutTimezones` ## How was this change tested? new integration tests
1 parent c6cbbb5 commit 4e536f9

File tree

9 files changed

+102
-54
lines changed

9 files changed

+102
-54
lines changed

kernel/src/actions/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,33 @@ impl Protocol {
559559
self.has_table_feature(&TableFeature::CatalogManaged)
560560
|| self.has_table_feature(&TableFeature::CatalogOwnedPreview)
561561
}
562+
563+
pub(crate) fn is_cdf_supported(&self) -> bool {
564+
// TODO: we should probably expand this to ~all supported reader features instead of gating
565+
// on a subset here. missing ones are:
566+
// - variantType
567+
// - typeWidening
568+
// - catalogManaged
569+
static CDF_SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> = LazyLock::new(|| {
570+
vec![
571+
TableFeature::DeletionVectors,
572+
TableFeature::ColumnMapping,
573+
TableFeature::TimestampWithoutTimezone,
574+
TableFeature::V2Checkpoint,
575+
TableFeature::VacuumProtocolCheck,
576+
]
577+
});
578+
match self.reader_features() {
579+
// if min_reader_version = 3 and all reader features are subset of supported => OK
580+
Some(reader_features) if self.min_reader_version() == 3 => {
581+
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok()
582+
}
583+
// if min_reader_version = 1 or 2 and there are no reader features => OK
584+
None => (1..=2).contains(&self.min_reader_version()),
585+
// any other protocol is not supported
586+
_ => false,
587+
}
588+
}
562589
}
563590

564591
// TODO: implement Scalar::From<HashMap<K, V>> so we can derive IntoEngineData using a macro (issue#1083)

kernel/src/table_changes/log_replay.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::schema::{
1919
ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType,
2020
ToSchema as _,
2121
};
22+
use crate::table_changes::check_cdf_table_properties;
2223
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
23-
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
2424
use crate::table_properties::TableProperties;
2525
use crate::utils::require;
2626
use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor};
@@ -180,8 +180,10 @@ impl LogReplayScanner {
180180
visitor.visit_rows_of(actions.as_ref())?;
181181

182182
if let Some(protocol) = visitor.protocol {
183-
ensure_cdf_read_supported(&protocol)
184-
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
183+
protocol
184+
.is_cdf_supported()
185+
.then_some(())
186+
.ok_or_else(|| Error::change_data_feed_unsupported(commit_file.version))?;
185187
}
186188
if let Some((schema, configuration)) = visitor.metadata_info {
187189
let schema: StructType = serde_json::from_str(&schema)?;

kernel/src/table_changes/log_replay/tests.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ async fn unsupported_reader_feature() {
138138
Protocol::try_new(
139139
3,
140140
7,
141-
Some([TableFeature::DeletionVectors, TableFeature::ColumnMapping]),
142-
Some([TableFeature::DeletionVectors, TableFeature::ColumnMapping]),
141+
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
142+
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
143143
)
144144
.unwrap(),
145145
)])
@@ -157,7 +157,7 @@ async fn unsupported_reader_feature() {
157157
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))));
158158
}
159159
#[tokio::test]
160-
async fn column_mapping_should_fail() {
160+
async fn column_mapping_should_succeed() {
161161
let engine = Arc::new(SyncEngine::new());
162162
let mut mock_table = LocalMockTable::new();
163163
mock_table
@@ -190,7 +190,8 @@ async fn column_mapping_should_fail() {
190190
.unwrap()
191191
.try_collect();
192192

193-
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))));
193+
// Column mapping with CDF should now succeed
194+
assert!(res.is_ok(), "CDF should now support column mapping");
194195
}
195196

196197
// Note: This should be removed once type widening support is added for CDF

kernel/src/table_changes/mod.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@ use std::sync::{Arc, LazyLock};
3636
use scan::TableChangesScanBuilder;
3737
use url::Url;
3838

39-
use crate::actions::{ensure_supported_features, Protocol};
4039
use crate::log_segment::LogSegment;
4140
use crate::path::AsUrl;
4241
use crate::schema::{DataType, Schema, StructField, StructType};
4342
use crate::snapshot::{Snapshot, SnapshotRef};
44-
use crate::table_features::{ColumnMappingMode, TableFeature};
4543
use crate::table_properties::TableProperties;
4644
use crate::utils::require;
4745
use crate::{DeltaResult, Engine, Error, Version};
@@ -253,40 +251,15 @@ impl TableChanges {
253251

254252
/// Ensures that change data feed is enabled in `table_properties`. See the documentation
255253
/// of [`TableChanges`] for more details.
254+
// TODO: move to TableProperties and normalize with the check in TableConfiguration
256255
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
257256
require!(
258257
table_properties.enable_change_data_feed.unwrap_or(false),
259258
Error::unsupported("Change data feed is not enabled")
260259
);
261-
require!(
262-
matches!(
263-
table_properties.column_mapping_mode,
264-
None | Some(ColumnMappingMode::None)
265-
),
266-
Error::unsupported("Change data feed not supported when column mapping is enabled")
267-
);
268260
Ok(())
269261
}
270262

271-
/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
272-
/// See the documentation of [`TableChanges`] for more details.
273-
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
274-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> =
275-
LazyLock::new(|| vec![TableFeature::DeletionVectors]);
276-
match &protocol.reader_features() {
277-
// if min_reader_version = 3 and all reader features are subset of supported => OK
278-
Some(reader_features) if protocol.min_reader_version() == 3 => {
279-
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES)
280-
}
281-
// if min_reader_version = 1 and there are no reader features => OK
282-
None if protocol.min_reader_version() == 1 => Ok(()),
283-
// any other protocol is not supported
284-
_ => Err(Error::unsupported(
285-
"Change data feed not supported on this protocol",
286-
)),
287-
}
288-
}
289-
290263
#[cfg(test)]
291264
mod tests {
292265
use super::*;

kernel/src/table_configuration.rs

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
//! [`TableProperties`].
99
//!
1010
//! [`Schema`]: crate::schema::Schema
11-
use std::sync::{Arc, LazyLock};
11+
use std::sync::Arc;
1212

1313
use url::Url;
1414

15-
use crate::actions::{ensure_supported_features, Metadata, Protocol};
15+
use crate::actions::{Metadata, Protocol};
1616
use crate::schema::variant_utils::validate_variant_type_feature_support;
1717
use crate::schema::{InvariantChecker, SchemaRef};
1818
use crate::table_features::{
@@ -400,27 +400,12 @@ impl TableConfiguration {
400400
/// [`TableChanges`]: crate::table_changes::TableChanges
401401
#[internal_api]
402402
pub(crate) fn is_cdf_read_supported(&self) -> bool {
403-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> =
404-
LazyLock::new(|| vec![TableFeature::DeletionVectors]);
405-
let protocol_supported = match self.protocol.reader_features() {
406-
// if min_reader_version = 3 and all reader features are subset of supported => OK
407-
Some(reader_features) if self.protocol.min_reader_version() == 3 => {
408-
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok()
409-
}
410-
// if min_reader_version = 1 and there are no reader features => OK
411-
None => self.protocol.min_reader_version() == 1,
412-
// any other protocol is not supported
413-
_ => false,
414-
};
403+
let protocol_supported = self.protocol.is_cdf_supported();
415404
let cdf_enabled = self
416405
.table_properties
417406
.enable_change_data_feed
418407
.unwrap_or(false);
419-
let column_mapping_disabled = matches!(
420-
self.table_properties.column_mapping_mode,
421-
None | Some(ColumnMappingMode::None)
422-
);
423-
protocol_supported && cdf_enabled && column_mapping_disabled
408+
protocol_supported && cdf_enabled
424409
}
425410

426411
/// Returns `true` if deletion vectors is supported on this table. To support deletion vectors,

kernel/tests/cdf.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,3 +545,63 @@ fn conditional_delete_two_rows() -> DeltaResult<()> {
545545
assert_batches_sorted_eq!(expected, &batches);
546546
Ok(())
547547
}
548+
549+
#[test]
550+
fn cdf_with_column_mapping_name_mode() -> Result<(), Box<dyn error::Error>> {
551+
// NOTE: these tables only have CDF enabled in version 1+, so we start reading from 1. This is
552+
// due to pyspark limitation while writing: we were unable to create a table with column
553+
// mapping + CDF enabled in commit 0, so we created with column mapping and enabled CDF in
554+
// commit 1.
555+
let batches = read_cdf_for_table("cdf-column-mapping-name-mode", 1, None, None)?;
556+
let mut expected = vec![
557+
"+----+-------+-------+------------------+-----------------+",
558+
"| id | name | value | _change_type | _commit_version |",
559+
"+----+-------+-------+------------------+-----------------+",
560+
"| 1 | Alice | 100.0 | delete | 4 |",
561+
"| 2 | Bob | 200.0 | update_preimage | 2 |",
562+
"| 2 | Bob | 250.0 | update_postimage | 2 |",
563+
"| 4 | David | 400.0 | insert | 3 |",
564+
"+----+-------+-------+------------------+-----------------+",
565+
];
566+
sort_lines!(expected);
567+
assert_batches_sorted_eq!(expected, &batches);
568+
569+
// same as above but instead of protocol 2,5 this is 3,7 with columnMapping+DV features
570+
let batches = read_cdf_for_table("cdf-column-mapping-name-mode-3-7", 1, None, None)?;
571+
let mut expected = vec![
572+
"+----+-------+-------+------------------+-----------------+",
573+
"| id | name | value | _change_type | _commit_version |",
574+
"+----+-------+-------+------------------+-----------------+",
575+
"| 1 | Alice | 100.0 | delete | 4 |",
576+
"| 2 | Bob | 200.0 | update_preimage | 2 |",
577+
"| 2 | Bob | 250.0 | update_postimage | 2 |",
578+
"| 4 | David | 400.0 | insert | 3 |",
579+
"+----+-------+-------+------------------+-----------------+",
580+
];
581+
sort_lines!(expected);
582+
assert_batches_sorted_eq!(expected, &batches);
583+
584+
Ok(())
585+
}
586+
587+
#[test]
588+
fn cdf_with_column_mapping_id_mode() -> Result<(), Box<dyn error::Error>> {
589+
// NOTE: these tables only have CDF enabled in version 1+, so we start reading from 1. This is
590+
// due to pyspark limitation while writing: we were unable to create a table with column
591+
// mapping + CDF enabled in commit 0, so we created with column mapping and enabled CDF in
592+
// commit 1.
593+
let batches = read_cdf_for_table("cdf-column-mapping-id-mode", 1, None, None)?;
594+
let mut expected = vec![
595+
"+----+-------+-------+------------------+-----------------+",
596+
"| id | name | value | _change_type | _commit_version |",
597+
"+----+-------+-------+------------------+-----------------+",
598+
"| 2 | Frank | 250.0 | update_preimage | 2 |",
599+
"| 2 | Frank | 275.0 | update_postimage | 2 |",
600+
"| 3 | Grace | 350.0 | delete | 4 |",
601+
"| 4 | Henry | 450.0 | insert | 3 |",
602+
"+----+-------+-------+------------------+-----------------+",
603+
];
604+
sort_lines!(expected);
605+
assert_batches_sorted_eq!(expected, &batches);
606+
Ok(())
607+
}
5.35 KB
Binary file not shown.
6.2 KB
Binary file not shown.
6.05 KB
Binary file not shown.

0 commit comments

Comments
 (0)