Skip to content
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
aab78d6
Add REE file column helpers
gbrgr Nov 4, 2025
ee21cab
Add helper tests
gbrgr Nov 4, 2025
37b52e2
Add constants
gbrgr Nov 4, 2025
44463a0
Add support for _file constant
gbrgr Nov 4, 2025
b5449f6
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
e034009
Update tests
gbrgr Nov 4, 2025
4f0a4f1
Fix clippy warning
gbrgr Nov 4, 2025
51f76d3
Fix doc test
gbrgr Nov 4, 2025
d84e16b
Track in field ids
gbrgr Nov 4, 2025
984dacd
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
bd478cb
Add test
gbrgr Nov 4, 2025
8593db0
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 4, 2025
9b186c7
Allow repeated virtual file column selection
gbrgr Nov 4, 2025
30ae5fb
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 5, 2025
adf0da0
Refactor into own transformer step
gbrgr Nov 7, 2025
f4336a8
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 7, 2025
ef3a965
Revert "Refactor into own transformer step"
gbrgr Nov 7, 2025
534490b
Avoid special casing in batch creation
gbrgr Nov 7, 2025
04bf463
.
gbrgr Nov 7, 2025
9e88edf
Modify record batch transformer to support reserved fields
gbrgr Nov 12, 2025
060b45d
Add metadata column helper functions
gbrgr Nov 12, 2025
8572dae
Store fields instead of constants
gbrgr Nov 12, 2025
f273add
Add comment
gbrgr Nov 12, 2025
5aa92ae
Adapt comment
gbrgr Nov 12, 2025
c05b886
.
gbrgr Nov 12, 2025
33bb0ad
Adapt error message
gbrgr Nov 12, 2025
42167ff
Consider field_id range
gbrgr Nov 12, 2025
cbc6b17
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
977c813
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
83443aa
Use REE encoding in record batch transformer
gbrgr Nov 14, 2025
35aba12
Fix clippy errors
gbrgr Nov 14, 2025
830e462
Format
gbrgr Nov 14, 2025
4eb8a63
Add `with_file_path_column` helper
gbrgr Nov 14, 2025
9d41b7f
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 17, 2025
f3573e9
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
73777bb
Merge branch 'main' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
22fcdd4
Port _file path column changes
gbrgr Nov 17, 2025
0b8f15b
Rename field
gbrgr Nov 17, 2025
7ce462e
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 17, 2025
6cc22ec
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
88e1113
Adapt metadata column
gbrgr Nov 17, 2025
fca14bd
Rename method
gbrgr Nov 17, 2025
b7da6d3
Undo some changes
gbrgr Nov 17, 2025
7ebdf87
.
gbrgr Nov 17, 2025
edbc72a
Re-refactor tests
gbrgr Nov 17, 2025
4a08ee6
Undo reader test changes
gbrgr Nov 17, 2025
671fd4f
.
gbrgr Nov 17, 2025
b4d8f81
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
facb89a
Merge branch 'main' into feature/gb/file-column-inc
gbrgr Nov 18, 2025
8ed457f
Move import
gbrgr Nov 18, 2025
4cde4fa
PR comments
gbrgr Nov 18, 2025
0ea00bc
Merge branch 'feature/gb/file-column-inc' of github.com:RelationalAI/…
gbrgr Nov 18, 2025
d4cf3fe
PR comments
gbrgr Nov 19, 2025
1257d72
.
gbrgr Nov 19, 2025
c0db19f
.
gbrgr Nov 19, 2025
a4c8425
Clippy fix
gbrgr Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,15 @@ async fn process_incremental_append_task(
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let mut record_batch_transformer =
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids).build();
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
.with_constant(
crate::metadata_columns::RESERVED_FIELD_ID_FILE,
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
)?
.build();

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ mod incremental;
pub use incremental::*;
pub use reader::*;
pub use value::*;

// Re-export delete file constants for convenience
pub(crate) use crate::metadata_columns::{
RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH,
RESERVED_FIELD_ID_POS,
};
/// Partition value calculator for computing partition values
pub mod partition_value_calculator;
pub use partition_value_calculator::*;
Expand Down
57 changes: 25 additions & 32 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,12 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Reserved field ID for the file path (_file) column per Iceberg spec
/// This is dead code for now but will be used when we add the _file column support.
#[allow(dead_code)]
pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646;

/// Column name for the file path metadata column per Iceberg spec
/// This is dead code for now but will be used when we add the _file column support.
#[allow(dead_code)]
pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file";

/// Reserved field ID for the file path column used in delete file reading.
pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546;

/// Column name for the file path metadata column used in delete file reading.
pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path";

/// Reserved field ID for the position column used in delete file reading.
pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483545;

/// Column name for the position metadata column used in delete file reading.
pub(crate) const RESERVED_COL_NAME_POS: &str = "pos";

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
Expand Down Expand Up @@ -282,12 +261,20 @@ impl ArrowReader {
initial_stream_builder
};

// Filter out metadata fields for Parquet projection (they don't exist in files)
let project_field_ids_without_metadata: Vec<i32> = task
.project_field_ids
.iter()
.filter(|&&id| !is_metadata_field(id))
.copied()
.collect();

// Create projection mask based on field IDs
// - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
// - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
// - If fallback IDs: position-based projection (missing_field_ids=true)
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&project_field_ids_without_metadata,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
Expand All @@ -298,16 +285,20 @@ impl ArrowReader {
record_batch_stream_builder.with_projection(projection_mask.clone());

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering.
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, partition constants, and virtual field addition (like _file)
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids())
.with_constant(
RESERVED_FIELD_ID_FILE,
PrimitiveLiteral::String(task.data_file_path.clone()),
)?;

if let (Some(partition_spec), Some(partition_data)) =
(task.partition_spec.clone(), task.partition.clone())
{
record_batch_transformer_builder =
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to do this for incremental scan as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have partition information (yet? not sure if it is needed at all) in the incremental scan

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we get wrong results because of it? If table had partitions and some partition transforms for example?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, this partition stuff has only been added recently. We may just add the same logic to the incremental tasks, but we first need to understand what's the actual issue

}

let mut record_batch_transformer = record_batch_transformer_builder.build();
Expand Down Expand Up @@ -448,7 +439,10 @@ impl ArrowReader {
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Ok(batch) => {
// Process the record batch (type promotion, column reordering, virtual fields, etc.)
record_batch_transformer.process_record_batch(batch)
}
Err(err) => Err(err.into()),
});

Expand Down Expand Up @@ -1882,13 +1876,12 @@ mod tests {

use crate::ErrorKind;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{
ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE,
};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
use crate::io::FileIO;
use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE};
use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
use crate::spec::{
DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
Expand Down
Loading
Loading