Skip to content

Commit 7cf46a8

Browse files
committed
improve table_scan for unpartitioned table
1 parent 02fc169 commit 7cf46a8

File tree

1 file changed

+40
-29
lines changed

1 file changed

+40
-29
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use itertools::Itertools;
2121
use lru::LruCache;
2222
use object_store::path::Path;
2323
use object_store::ObjectMeta;
24+
use std::collections::BTreeMap;
2425
use std::thread::available_parallelism;
2526
use std::{
2627
any::Any,
@@ -434,7 +435,9 @@ async fn table_scan(
434435
HashMap::new();
435436

436437
// Prune data & delete file and insert them into the according map
437-
let statistics = if let Some(physical_predicate) = physical_predicate.clone() {
438+
let (content_file_iter, statistics) = if let Some(physical_predicate) =
439+
physical_predicate.clone()
440+
{
438441
let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone()));
439442
let partition_column_names = partition_fields
440443
.iter()
@@ -502,31 +505,11 @@ async fn table_scan(
502505

503506
let statistics = statistics_from_datafiles(&schema, &data_files);
504507

505-
data_files
508+
let iter = data_files
506509
.into_iter()
507510
.zip(files_to_prune.into_iter())
508-
.for_each(|(manifest, prune_file)| {
509-
if prune_file && *manifest.1.status() != Status::Deleted {
510-
match manifest.1.data_file().content() {
511-
Content::Data => {
512-
data_file_groups
513-
.entry(manifest.1.data_file().partition().clone())
514-
.or_default()
515-
.push(manifest);
516-
}
517-
Content::EqualityDeletes => {
518-
equality_delete_file_groups
519-
.entry(manifest.1.data_file().partition().clone())
520-
.or_default()
521-
.push(manifest);
522-
}
523-
Content::PositionDeletes => {
524-
panic!("Position deletes not supported.")
525-
}
526-
}
527-
};
528-
});
529-
statistics
511+
.filter_map(|(manifest, prune_file)| if prune_file { Some(manifest) } else { None });
512+
(itertools::Either::Left(iter), statistics)
530513
} else {
531514
let manifests = table
532515
.manifests(snapshot_range.0, snapshot_range.1)
@@ -541,7 +524,38 @@ async fn table_scan(
541524

542525
let statistics = statistics_from_datafiles(&schema, &data_files);
543526

544-
data_files.into_iter().for_each(|manifest| {
527+
let iter = data_files.into_iter();
528+
(itertools::Either::Right(iter), statistics)
529+
};
530+
531+
if partition_fields.is_empty() {
532+
let (data_files, equality_delete_files): (Vec<_>, Vec<_>) = content_file_iter
533+
.filter(|manifest| *manifest.1.status() != Status::Deleted)
534+
.partition(|manifest| match manifest.1.data_file().content() {
535+
Content::Data => true,
536+
Content::EqualityDeletes => false,
537+
Content::PositionDeletes => panic!("Position deletes not supported."),
538+
});
539+
if !data_files.is_empty() {
540+
data_file_groups.insert(
541+
Struct {
542+
fields: Vec::new(),
543+
lookup: BTreeMap::new(),
544+
},
545+
data_files,
546+
);
547+
}
548+
if !equality_delete_files.is_empty() {
549+
equality_delete_file_groups.insert(
550+
Struct {
551+
fields: Vec::new(),
552+
lookup: BTreeMap::new(),
553+
},
554+
equality_delete_files,
555+
);
556+
}
557+
} else {
558+
content_file_iter.for_each(|manifest| {
545559
if *manifest.1.status() != Status::Deleted {
546560
match manifest.1.data_file().content() {
547561
Content::Data => {
@@ -562,8 +576,7 @@ async fn table_scan(
562576
}
563577
}
564578
});
565-
statistics
566-
};
579+
}
567580

568581
let file_source = {
569582
let physical_predicate = physical_predicate.clone();
@@ -1620,8 +1633,6 @@ mod tests {
16201633
if let Tabular::Table(table) = table.tabular.read().await.deref() {
16211634
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
16221635
};
1623-
1624-
panic!()
16251636
}
16261637

16271638
#[tokio::test]

0 commit comments

Comments
 (0)