Skip to content

Commit 66e3c71

Browse files
Multiple data sequence numbers per commit (#18)
1 parent 85112a1 commit 66e3c71

File tree

3 files changed

+133
-42
lines changed

3 files changed

+133
-42
lines changed

iceberg-rust/src/materialized_view/transaction/mod.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use crate::{
1414
error::Error,
1515
table::{
1616
delete_all_table_files,
17-
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
17+
transaction::{
18+
operation::{DsnGroup, Operation as TableOperation},
19+
APPEND_INDEX, REPLACE_INDEX,
20+
},
1821
},
1922
view::transaction::operation::Operation as ViewOperation,
2023
};
@@ -105,12 +108,17 @@ impl<'view> Transaction<'view> {
105108
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
106109
if let TableOperation::Append {
107110
branch: _,
108-
data_files: old,
109-
delete_files: _,
111+
dsn_groups,
110112
additional_summary: old_lineage,
111113
} = operation
112114
{
113-
old.extend_from_slice(&files);
115+
match dsn_groups.last_mut() {
116+
Some(g) => g.data_files.extend_from_slice(&files),
117+
None => dsn_groups.push(DsnGroup {
118+
data_files: files,
119+
delete_files: vec![],
120+
}),
121+
};
114122
*old_lineage = Some(HashMap::from_iter(vec![(
115123
REFRESH_STATE.to_owned(),
116124
refresh_state.clone(),
@@ -119,8 +127,10 @@ impl<'view> Transaction<'view> {
119127
} else {
120128
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
121129
branch: self.branch.clone(),
122-
data_files: files,
123-
delete_files: Vec::new(),
130+
dsn_groups: vec![DsnGroup {
131+
data_files: files,
132+
delete_files: vec![],
133+
}],
124134
additional_summary: Some(HashMap::from_iter(vec![(
125135
REFRESH_STATE.to_owned(),
126136
refresh_state,
@@ -140,12 +150,17 @@ impl<'view> Transaction<'view> {
140150
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
141151
if let TableOperation::Append {
142152
branch: _,
143-
data_files: _,
144-
delete_files: old,
153+
dsn_groups,
145154
additional_summary: old_lineage,
146155
} = operation
147156
{
148-
old.extend_from_slice(&files);
157+
match dsn_groups.last_mut() {
158+
Some(g) => g.delete_files.extend_from_slice(&files),
159+
None => dsn_groups.push(DsnGroup {
160+
data_files: vec![],
161+
delete_files: files,
162+
}),
163+
};
149164
*old_lineage = Some(HashMap::from_iter(vec![(
150165
REFRESH_STATE.to_owned(),
151166
refresh_state.clone(),
@@ -154,8 +169,10 @@ impl<'view> Transaction<'view> {
154169
} else {
155170
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
156171
branch: self.branch.clone(),
157-
data_files: Vec::new(),
158-
delete_files: files,
172+
dsn_groups: vec![DsnGroup {
173+
data_files: vec![],
174+
delete_files: files,
175+
}],
159176
additional_summary: Some(HashMap::from_iter(vec![(
160177
REFRESH_STATE.to_owned(),
161178
refresh_state,

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2121

2222
use crate::table::transaction::append::append_summary;
23+
pub use crate::table::transaction::operation::DsnGroup;
2324
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
2425

2526
use self::operation::Operation;
@@ -120,17 +121,24 @@ impl<'table> TableTransaction<'table> {
120121
let summary = append_summary(&files);
121122

122123
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
123-
if let Operation::Append {
124-
data_files: old, ..
125-
} = operation
126-
{
127-
old.extend_from_slice(&files);
124+
if let Operation::Append { dsn_groups, .. } = operation {
125+
match dsn_groups.last_mut() {
126+
Some(g) => g.data_files.extend_from_slice(&files),
127+
None => dsn_groups.push(DsnGroup {
128+
data_files: files,
129+
delete_files: vec![],
130+
}),
131+
};
132+
} else {
133+
panic!("Operation at APPEND_INDEX should be an Append");
128134
}
129135
} else {
130136
self.operations[APPEND_INDEX] = Some(Operation::Append {
131137
branch: self.branch.clone(),
132-
data_files: files,
133-
delete_files: Vec::new(),
138+
dsn_groups: vec![DsnGroup {
139+
data_files: files,
140+
delete_files: vec![],
141+
}],
134142
additional_summary: summary,
135143
});
136144
}
@@ -159,23 +167,47 @@ impl<'table> TableTransaction<'table> {
159167
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
160168
if let Operation::Append {
161169
branch: _,
162-
data_files: _,
163-
delete_files: old,
164-
additional_summary: None,
170+
dsn_groups,
171+
..
165172
} = operation
166173
{
167-
old.extend_from_slice(&files);
174+
match dsn_groups.last_mut() {
175+
Some(g) => g.delete_files.extend_from_slice(&files),
176+
None => dsn_groups.push(DsnGroup {
177+
data_files: vec![],
178+
delete_files: files,
179+
}),
180+
};
181+
} else {
182+
panic!("Operation at APPEND_INDEX should be an Append");
168183
}
169184
} else {
170185
self.operations[APPEND_INDEX] = Some(Operation::Append {
171186
branch: self.branch.clone(),
172-
data_files: Vec::new(),
173-
delete_files: files,
187+
dsn_groups: vec![DsnGroup {
188+
data_files: vec![],
189+
delete_files: files,
190+
}],
174191
additional_summary: None,
175192
});
176193
}
177194
self
178195
}
196+
/// Create a new data sequence number for subsequent appends
197+
pub fn new_data_sequence_number(mut self) -> Self {
198+
if let Some(Operation::Append {
199+
branch: _,
200+
ref mut dsn_groups,
201+
..
202+
}) = self.operations[APPEND_INDEX]
203+
{
204+
dsn_groups.push(DsnGroup {
205+
data_files: vec![],
206+
delete_files: vec![],
207+
});
208+
}
209+
self
210+
}
179211
/// Overwrites specific data files in the table with new ones
180212
///
181213
/// This operation replaces specified existing data files with new ones, rather than

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ use super::append::split_datafiles;
3838
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
3939
static MIN_DATAFILES_PER_MANIFEST: usize = 4;
4040

41+
#[derive(Debug, Clone)]
42+
/// Group of write sharing a Data Sequence Number
43+
pub struct DsnGroup {
44+
/// Delete files. These apply to insert files from previous Data Sequence Groups
45+
pub delete_files: Vec<DataFile>,
46+
/// Insert files
47+
pub data_files: Vec<DataFile>,
48+
}
49+
4150
#[derive(Debug)]
4251
///Table operations
4352
pub enum Operation {
@@ -56,8 +65,7 @@ pub enum Operation {
5665
/// Append new files to the table
5766
Append {
5867
branch: Option<String>,
59-
data_files: Vec<DataFile>,
60-
delete_files: Vec<DataFile>,
68+
dsn_groups: Vec<DsnGroup>,
6169
additional_summary: Option<HashMap<String, String>>,
6270
},
6371
// /// Quickly append new files to the table
@@ -99,8 +107,7 @@ impl Operation {
99107
match self {
100108
Operation::Append {
101109
branch,
102-
data_files,
103-
delete_files,
110+
dsn_groups,
104111
additional_summary,
105112
} => {
106113
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
@@ -110,14 +117,31 @@ impl Operation {
110117
FormatVersion::V2 => manifest_list_schema_v2(),
111118
};
112119

120+
let mut dsn_offset = 0;
121+
let mut data_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
122+
let mut delete_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
123+
for dsn_group in dsn_groups.into_iter() {
124+
if !dsn_group.data_files.is_empty() || !dsn_group.delete_files.is_empty() {
125+
dsn_offset += 1;
126+
for data_file in dsn_group.data_files.into_iter() {
127+
data_files.push((data_file, dsn_offset));
128+
}
129+
for delete_file in dsn_group.delete_files.into_iter() {
130+
delete_files.push((delete_file, dsn_offset));
131+
}
132+
}
133+
}
134+
113135
let n_data_files = data_files.len();
114136
let n_delete_files = delete_files.len();
115137

116138
if n_data_files + n_delete_files == 0 {
117139
return Ok((None, Vec::new()));
118140
}
141+
let largest_dsn_offset = dsn_offset;
142+
assert!(largest_dsn_offset >= 1, "Should have exited early");
119143

120-
let data_files_iter = delete_files.iter().chain(data_files.iter());
144+
let data_files_iter = delete_files.iter().chain(data_files.iter()).map(|(x, _)| x);
121145

122146
let mut manifest_list_writer = if let Some(manifest_list_bytes) =
123147
prefetch_manifest_list(old_snapshot, &object_store)
@@ -143,25 +167,41 @@ impl Operation {
143167
let n_delete_splits =
144168
manifest_list_writer.n_splits(n_delete_files, Content::Deletes);
145169

146-
let new_datafile_iter = data_files.into_iter().map(|data_file| {
147-
ManifestEntry::builder()
170+
let new_datafile_iter = data_files.into_iter().map(|(data_file, dsn_offset)| {
171+
let mut builder = ManifestEntry::builder();
172+
builder
148173
.with_format_version(table_metadata.format_version)
149174
.with_status(Status::Added)
150-
.with_data_file(data_file)
175+
.with_data_file(data_file);
176+
// If there is only one data sequence number in this commit, we can just use sequence number inheritance
177+
// If there are multiple data sequence numbers in this commit, we need to set the data sequence number on each manifest
178+
if largest_dsn_offset > 1 {
179+
builder
180+
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset);
181+
}
182+
builder
151183
.build()
152184
.map_err(crate::spec::error::Error::from)
153185
.map_err(Error::from)
154186
});
155187

156-
let new_deletefile_iter = delete_files.into_iter().map(|data_file| {
157-
ManifestEntry::builder()
158-
.with_format_version(table_metadata.format_version)
159-
.with_status(Status::Added)
160-
.with_data_file(data_file)
161-
.build()
162-
.map_err(crate::spec::error::Error::from)
163-
.map_err(Error::from)
164-
});
188+
let new_deletefile_iter =
189+
delete_files.into_iter().map(|(data_file, dsn_offset)| {
190+
let mut builder = ManifestEntry::builder();
191+
builder
192+
.with_format_version(table_metadata.format_version)
193+
.with_status(Status::Added)
194+
.with_data_file(data_file);
195+
if largest_dsn_offset > 1 {
196+
builder.with_sequence_number(
197+
table_metadata.last_sequence_number + dsn_offset,
198+
);
199+
}
200+
builder
201+
.build()
202+
.map_err(crate::spec::error::Error::from)
203+
.map_err(Error::from)
204+
});
165205

166206
let snapshot_id = generate_snapshot_id();
167207

@@ -211,11 +251,13 @@ impl Operation {
211251
(_, _) => Ok(SnapshotOperation::Overwrite),
212252
}?;
213253

254+
let snapshot_sequence_number =
255+
table_metadata.last_sequence_number + largest_dsn_offset;
214256
let mut snapshot_builder = SnapshotBuilder::default();
215257
snapshot_builder
216258
.with_snapshot_id(snapshot_id)
217259
.with_manifest_list(new_manifest_list_location)
218-
.with_sequence_number(table_metadata.last_sequence_number + 1)
260+
.with_sequence_number(snapshot_sequence_number)
219261
.with_summary(Summary {
220262
operation: snapshot_operation,
221263
other: additional_summary.unwrap_or_default(),

0 commit comments

Comments
 (0)