Skip to content

Commit b7219a8

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feat/metrics
2 parents 2af7649 + 74e75d3 commit b7219a8

File tree

20 files changed

+666
-105
lines changed

20 files changed

+666
-105
lines changed

ffi/examples/read-table/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ set(DatPath "../../../../acceptance/tests/dat/out/reader_tests/generated")
1616
set(ExpectedPath "../../../tests/read-table-testing/expected-data")
1717
set(KernelTestPath "../../../../kernel/tests/data")
1818
add_test(NAME read_and_print_all_prim COMMAND ${TestRunner} ${DatPath}/all_primitive_types/delta/ ${ExpectedPath}/all-prim-types.expected)
19+
add_test(NAME read_and_print_nested COMMAND ${TestRunner} ${DatPath}/nested_types/delta/ ${ExpectedPath}/nested-types.expected)
1920
add_test(NAME read_and_print_basic_partitioned COMMAND ${TestRunner} ${DatPath}/basic_partitioned/delta/ ${ExpectedPath}/basic-partitioned.expected)
2021
add_test(NAME read_and_print_with_dv_small COMMAND ${TestRunner} ${KernelTestPath}/table-with-dv-small/ ${ExpectedPath}/table-with-dv-small.expected)
2122

ffi/examples/read-table/schema.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,10 @@ void visit_array(
148148
{
149149
SchemaBuilder* builder = data;
150150
char* name_ptr = malloc(sizeof(char) * (name.len + 22));
151-
snprintf(name_ptr, name.len + 1, "%s", name.ptr);
152-
snprintf(name_ptr + name.len, 22, " (is nullable: %s)", is_nullable ? "true" : "false");
151+
// NOTE: we truncate to the max int size because the format specifier "%.*s" requires an int length specifier
152+
int name_chars = name.len > INT_MAX ? INT_MAX : (int)name.len; // handle _REALLY_ long names by truncation
153+
int wrote = snprintf(name_ptr, name.len + 1, "%.*s", name_chars, name.ptr);
154+
snprintf(name_ptr + wrote, 22, " (is nullable: %s)", is_nullable ? "true" : "false");
153155
print_physical_name(name_ptr, metadata);
154156
PRINT_CHILD_VISIT("array", name_ptr, sibling_list_id, "Types", child_list_id);
155157
SchemaItem* array_item = add_to_list(&builder->lists[sibling_list_id], name_ptr, "array", is_nullable);
@@ -166,8 +168,10 @@ void visit_map(
166168
{
167169
SchemaBuilder* builder = data;
168170
char* name_ptr = malloc(sizeof(char) * (name.len + 22));
169-
snprintf(name_ptr, name.len + 1, "%s", name.ptr);
170-
snprintf(name_ptr + name.len, 22, " (is nullable: %s)", is_nullable ? "true" : "false");
171+
// NOTE: we truncate to the max int size because the format specifier "%.*s" requires an int length specifier
172+
int name_chars = name.len > INT_MAX ? INT_MAX : (int)name.len; // handle _REALLY_ long names by truncation
173+
int wrote = snprintf(name_ptr, name.len + 1, "%.*s", name_chars, name.ptr);
174+
snprintf(name_ptr + wrote, 22, " (is nullable: %s)", is_nullable ? "true" : "false");
171175
print_physical_name(name_ptr, metadata);
172176
PRINT_CHILD_VISIT("map", name_ptr, sibling_list_id, "Types", child_list_id);
173177
SchemaItem* map_item = add_to_list(&builder->lists[sibling_list_id], name_ptr, "map", is_nullable);

ffi/src/expressions/engine_visitor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ fn visit_expression_array(
317317
array: &ArrayData,
318318
sibling_list_id: usize,
319319
) {
320-
#[allow(deprecated)]
321320
let elements = array.array_elements();
322321
let child_list_id = call!(visitor, make_field_list, elements.len());
323322
for scalar in elements {
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
Reading table at ../../../../acceptance/tests/dat/out/reader_tests/generated/nested_types/delta/
2+
version: 0
3+
4+
Schema:
5+
├─ pk: integer
6+
├─ struct: struct
7+
│ ├─ float64: double
8+
│ └─ bool: boolean
9+
├─ array (is nullable: true): array
10+
│ └─ array_element: short
11+
└─ map (is nullable: true): map
12+
├─ map_key: string
13+
└─ map_value: integer
14+
15+
pk: [
16+
0,
17+
1,
18+
2,
19+
3,
20+
4
21+
]
22+
struct: -- is_valid: all not null
23+
-- child 0 type: double
24+
[
25+
0,
26+
1,
27+
2,
28+
3,
29+
4
30+
]
31+
-- child 1 type: bool
32+
[
33+
true,
34+
false,
35+
true,
36+
false,
37+
true
38+
]
39+
array: [
40+
[
41+
0
42+
],
43+
[
44+
0,
45+
1
46+
],
47+
[
48+
0,
49+
1,
50+
2
51+
],
52+
[
53+
0,
54+
1,
55+
2,
56+
3
57+
],
58+
[
59+
0,
60+
1,
61+
2,
62+
3,
63+
4
64+
]
65+
]
66+
map: [
67+
keys:
68+
[]
69+
values:
70+
[],
71+
keys:
72+
[
73+
"0"
74+
]
75+
values:
76+
[
77+
0
78+
],
79+
keys:
80+
[
81+
"0",
82+
"1"
83+
]
84+
values:
85+
[
86+
0,
87+
1
88+
],
89+
keys:
90+
[
91+
"0",
92+
"1",
93+
"2"
94+
]
95+
values:
96+
[
97+
0,
98+
1,
99+
2
100+
],
101+
keys:
102+
[
103+
"0",
104+
"1",
105+
"2",
106+
"3"
107+
]
108+
values:
109+
[
110+
0,
111+
1,
112+
2,
113+
3
114+
]
115+
]

kernel/src/actions/mod.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,6 @@ impl Protocol {
590590
/// Check if writing to a table with this protocol is supported. That is: does the kernel
591591
/// support the specified protocol writer version and all enabled writer features?
592592
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
593-
#[cfg(feature = "catalog-managed")]
594-
require!(
595-
!self.is_catalog_managed(),
596-
Error::unsupported("Writes are not yet supported for catalog-managed tables")
597-
);
598593
match &self.writer_features {
599594
Some(writer_features) if self.min_writer_version == 7 => {
600595
// if we're on version 7, make sure we support all the specified features
@@ -1610,7 +1605,7 @@ mod tests {
16101605
.unwrap();
16111606
assert_result_error_with_message(
16121607
protocol.ensure_write_supported(),
1613-
r#"Unsupported: Found unsupported TableFeatures: "identityColumns". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
1608+
r#"Unsupported: Found unsupported TableFeatures: "identityColumns". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#,
16141609
);
16151610

16161611
// Unknown writer features are allowed during creation for forward compatibility,
@@ -1624,7 +1619,7 @@ mod tests {
16241619
.unwrap();
16251620
assert_result_error_with_message(
16261621
protocol.ensure_write_supported(),
1627-
r#"Unsupported: Found unsupported TableFeatures: "unsupported writer". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
1622+
r#"Unsupported: Found unsupported TableFeatures: "unsupported writer". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#,
16281623
);
16291624
}
16301625

@@ -1680,24 +1675,25 @@ mod tests {
16801675
assert_eq!(parse_features::<TableFeature>(features), expected);
16811676
}
16821677

1678+
#[cfg(feature = "catalog-managed")]
16831679
#[test]
1684-
fn test_no_catalog_managed_writes() {
1680+
fn test_catalog_managed_writes() {
16851681
let protocol = Protocol::try_new(
16861682
3,
16871683
7,
16881684
Some([TableFeature::CatalogManaged]),
16891685
Some([TableFeature::CatalogManaged]),
16901686
)
16911687
.unwrap();
1692-
assert!(protocol.ensure_write_supported().is_err());
1688+
assert!(protocol.ensure_write_supported().is_ok());
16931689
let protocol = Protocol::try_new(
16941690
3,
16951691
7,
16961692
Some([TableFeature::CatalogOwnedPreview]),
16971693
Some([TableFeature::CatalogOwnedPreview]),
16981694
)
16991695
.unwrap();
1700-
assert!(protocol.ensure_write_supported().is_err());
1696+
assert!(protocol.ensure_write_supported().is_ok());
17011697
}
17021698

17031699
#[test]

kernel/src/committer.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,17 @@ use url::Url;
4545
pub struct CommitMetadata {
4646
pub(crate) log_root: LogRoot,
4747
pub(crate) version: Version,
48+
pub(crate) in_commit_timestamp: i64,
4849
// in the future this will include Protocol, Metadata, CommitInfo, Domain Metadata, etc.
4950
}
5051

5152
impl CommitMetadata {
52-
pub(crate) fn new(log_root: LogRoot, version: Version) -> Self {
53-
Self { log_root, version }
53+
pub(crate) fn new(log_root: LogRoot, version: Version, in_commit_timestamp: i64) -> Self {
54+
Self {
55+
log_root,
56+
version,
57+
in_commit_timestamp,
58+
}
5459
}
5560

5661
/// The commit path is the absolute path (e.g. s3://bucket/table/_delta_log/{version}.json) to
@@ -73,6 +78,16 @@ impl CommitMetadata {
7378
pub fn version(&self) -> Version {
7479
self.version
7580
}
81+
82+
/// The in-commit timestamp for the commit. Note that this may differ from the actual commit
83+
/// file modification time.
84+
pub fn in_commit_timestamp(&self) -> i64 {
85+
self.in_commit_timestamp
86+
}
87+
88+
pub fn table_root(&self) -> &Url {
89+
self.log_root.table_root()
90+
}
7691
}
7792

7893
/// `CommitResponse` is the result of committing a transaction via a catalog. The committer uses
@@ -171,11 +186,14 @@ mod tests {
171186
let table_root = Url::parse("s3://my-bucket/path/to/table/").unwrap();
172187
let log_root = LogRoot::new(table_root).unwrap();
173188
let version = 42;
189+
let ts = 1234;
174190

175-
let commit_metadata = CommitMetadata::new(log_root, version);
191+
let commit_metadata = CommitMetadata::new(log_root, version, ts);
176192

177193
// version
178194
assert_eq!(commit_metadata.version(), 42);
195+
// in_commit_timestamp
196+
assert_eq!(commit_metadata.in_commit_timestamp(), 1234);
179197

180198
// published commit path
181199
let published_path = commit_metadata.published_commit_path().unwrap();
@@ -189,8 +207,9 @@ mod tests {
189207
let staged_path_str = staged_path.as_str();
190208

191209
assert!(
192-
staged_path_str
193-
.starts_with("s3://my-bucket/path/to/table/_delta_log/00000000000000000042."),
210+
staged_path_str.starts_with(
211+
"s3://my-bucket/path/to/table/_delta_log/_staged_commits/00000000000000000042."
212+
),
194213
"Staged path should start with the correct prefix, got: {}",
195214
staged_path_str
196215
);
@@ -200,15 +219,17 @@ mod tests {
200219
staged_path_str
201220
);
202221
let uuid_str = staged_path_str
203-
.strip_prefix("s3://my-bucket/path/to/table/_delta_log/00000000000000000042.")
222+
.strip_prefix(
223+
"s3://my-bucket/path/to/table/_delta_log/_staged_commits/00000000000000000042.",
224+
)
204225
.and_then(|s| s.strip_suffix(".json"))
205226
.expect("Staged path should have expected format");
206227
uuid::Uuid::parse_str(uuid_str).expect("Staged path should contain a valid UUID");
207228
}
208229

209230
#[cfg(feature = "catalog-managed")]
210231
#[tokio::test]
211-
async fn catalog_managed_tables_block_transactions() {
232+
async fn disallow_filesystem_committer_for_catalog_managed_tables() {
212233
let storage = Arc::new(InMemory::new());
213234
let table_root = Url::parse("memory:///").unwrap();
214235
let engine = DefaultEngine::new(storage.clone());
@@ -225,18 +246,16 @@ mod tests {
225246
let snapshot = crate::snapshot::SnapshotBuilder::new_for(table_root)
226247
.build(&engine)
227248
.unwrap();
228-
// Try to create a transaction with FileSystemCommitter
249+
// Try to commit a transaction with FileSystemCommitter
229250
let committer = Box::new(FileSystemCommitter::new());
230-
let err = snapshot.transaction(committer).unwrap_err();
251+
let err = snapshot
252+
.transaction(committer)
253+
.unwrap()
254+
.commit(&engine)
255+
.unwrap_err();
231256
assert!(matches!(
232257
err,
233-
crate::Error::Unsupported(e) if e.contains("Writes are not yet supported for catalog-managed tables")
258+
crate::Error::Generic(e) if e.contains("The FileSystemCommitter cannot be used to commit to catalog-managed tables. Please provide a committer for your catalog via Transaction::with_committer().")
234259
));
235-
// after allowing writes, we will check that this disallows default committer for
236-
// catalog-managed tables.
237-
// assert!(matches!(
238-
// err,
239-
// crate::Error::Generic(e) if e.contains("Cannot use the default committer for a catalog-managed table")
240-
// ));
241260
}
242261
}

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,6 @@ pub fn evaluate_predicate(
380380
}
381381
}
382382
(Expression::Literal(lit), Expression::Literal(Scalar::Array(ad))) => {
383-
#[allow(deprecated)]
384383
let exists = ad.array_elements().contains(lit);
385384
Ok(BooleanArray::from(vec![exists]))
386385
}

kernel/src/engine/arrow_expression/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ impl Scalar {
113113
Array(data) => {
114114
let builder = builder_as!(array::ListBuilder<Box<dyn ArrayBuilder>>);
115115
for _ in 0..num_rows {
116-
#[allow(deprecated)]
117116
for value in data.array_elements() {
118117
value.append_to(builder.values(), 1)?;
119118
}
@@ -218,7 +217,6 @@ impl ArrayData {
218217
pub fn to_arrow(&self) -> DeltaResult<ArrayRef> {
219218
let arrow_data_type = ArrowDataType::try_from_kernel(self.array_type().element_type())?;
220219

221-
#[allow(deprecated)]
222220
let elements = self.array_elements();
223221
let mut builder = array::make_builder(&arrow_data_type, elements.len());
224222
for element in elements {

kernel/src/expressions/scalars.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ impl ArrayData {
9494
&self.tpe
9595
}
9696

97-
#[deprecated(
98-
note = "These fields will be removed eventually and are unstable. See https://github.com/delta-io/delta-kernel-rs/issues/291"
99-
)]
10097
pub fn array_elements(&self) -> &[Scalar] {
10198
&self.elements
10299
}
@@ -923,7 +920,6 @@ mod tests {
923920

924921
#[test]
925922
fn test_arrays() {
926-
#[allow(deprecated)]
927923
let array = Scalar::Array(ArrayData {
928924
tpe: ArrayType::new(DataType::INTEGER, false),
929925
elements: vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)],
@@ -1173,7 +1169,6 @@ mod tests {
11731169
let Scalar::Array(array_data) = scalar else {
11741170
panic!("Expected Array scalar");
11751171
};
1176-
#[allow(deprecated)]
11771172
let elements = array_data.array_elements();
11781173
assert_eq!(elements.len(), 3);
11791174
assert!(!array_data.array_type().contains_null());
@@ -1206,7 +1201,6 @@ mod tests {
12061201
panic!("Expected Array scalar");
12071202
};
12081203

1209-
#[allow(deprecated)]
12101204
let elements = array_data.array_elements();
12111205
assert_eq!(elements.len(), 3);
12121206
assert!(array_data.array_type().contains_null());

0 commit comments

Comments
 (0)