Skip to content

Commit b4f8b91

Browse files
authored
Merge pull request JanKaul#258 from JanKaul/write-manifests-concurrently
Write manifests concurrently
2 parents 7807ad8 + 9f7e359 commit b4f8b91

File tree

3 files changed

+139
-19
lines changed

3 files changed

+139
-19
lines changed

iceberg-rust/src/table/manifest.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
use std::{
1919
collections::HashSet,
20+
future::Future,
2021
io::Read,
2122
iter::{repeat, Map, Repeat, Zip},
2223
sync::Arc,
@@ -26,6 +27,7 @@ use apache_avro::{
2627
to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
2728
Writer as AvroWriter,
2829
};
30+
use futures::TryFutureExt;
2931
use iceberg_rust_spec::{
3032
manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
3133
manifest_list::{self, FieldSummary, ManifestListEntry},
@@ -636,6 +638,49 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
636638
.await?;
637639
Ok(self.manifest)
638640
}
641+
642+
/// Finishes writing the manifest file concurrently.
643+
///
644+
/// This method completes the manifest writing process by finalizing the writer
645+
/// and returning both the manifest list entry and a future for the actual file upload.
646+
/// The upload operation can be awaited separately, allowing for concurrent processing
647+
/// of multiple manifest writes.
648+
///
649+
/// # Arguments
650+
///
651+
/// * `object_store` - The object store implementation used to persist the manifest file
652+
///
653+
/// # Returns
654+
///
655+
/// Returns a tuple containing:
656+
/// - `ManifestListEntry`: The completed manifest entry with updated metadata
657+
/// - `impl Future<Output = Result<PutResult, Error>>`: A future that performs the actual file upload
658+
///
659+
/// # Errors
660+
///
661+
/// Returns an error if:
662+
/// - The writer cannot be finalized
663+
/// - There are issues preparing the upload operation
664+
pub(crate) fn finish_concurrently(
665+
mut self,
666+
object_store: Arc<dyn ObjectStore>,
667+
) -> Result<(ManifestListEntry, impl Future<Output = Result<(), Error>>), Error> {
668+
let manifest_bytes = self.writer.into_inner()?;
669+
670+
let manifest_length: i64 = manifest_bytes.len() as i64;
671+
672+
self.manifest.manifest_length += manifest_length;
673+
674+
let path = strip_prefix(&self.manifest.manifest_path).as_str().into();
675+
let future = async move {
676+
object_store
677+
.put(&path, manifest_bytes.into())
678+
.map_ok(|_| ())
679+
.map_err(Error::from)
680+
.await
681+
};
682+
Ok((self.manifest, future))
683+
}
639684
}
640685

641686
#[allow(clippy::type_complexity)]

iceberg-rust/src/table/manifest_list.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use std::{
66
collections::{HashMap, HashSet},
7+
future::Future,
78
io::{Cursor, Read},
89
iter::{repeat, Map, Repeat, Zip},
910
sync::Arc,
@@ -12,7 +13,7 @@ use std::{
1213
use apache_avro::{
1314
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
1415
};
15-
use futures::{future::join_all, TryStreamExt};
16+
use futures::{future::join_all, TryFutureExt, TryStreamExt};
1617
use iceberg_rust_spec::{
1718
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
1819
manifest_list::{
@@ -700,6 +701,24 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
700701
.await
701702
}
702703

704+
#[inline]
705+
pub(crate) async fn append_concurrently(
706+
&mut self,
707+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
708+
snapshot_id: i64,
709+
object_store: Arc<dyn ObjectStore>,
710+
content: Content,
711+
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
712+
self.append_filtered_concurrently(
713+
data_files,
714+
snapshot_id,
715+
None::<fn(&Result<ManifestEntry, Error>) -> bool>,
716+
object_store,
717+
content,
718+
)
719+
.await
720+
}
721+
703722
/// Appends data files to a single manifest with optional filtering and finalizes the manifest list.
704723
///
705724
/// This method extends the basic `append` functionality by providing the ability to
@@ -754,6 +773,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
754773
/// object_store,
755774
/// ).await?;
756775
/// ```
776+
#[inline]
757777
pub(crate) async fn append_filtered(
758778
&mut self,
759779
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
@@ -762,6 +782,20 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
762782
object_store: Arc<dyn ObjectStore>,
763783
content: Content,
764784
) -> Result<(), Error> {
785+
self.append_filtered_concurrently(data_files, snapshot_id, filter, object_store, content)
786+
.await?
787+
.await?;
788+
Ok(())
789+
}
790+
791+
pub(crate) async fn append_filtered_concurrently(
792+
&mut self,
793+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
794+
snapshot_id: i64,
795+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
796+
object_store: Arc<dyn ObjectStore>,
797+
content: Content,
798+
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
765799
let selected_manifest = match content {
766800
Content::Data => self.selected_data_manifest.take(),
767801
Content::Deletes => self.selected_delete_manifest.take(),
@@ -820,11 +854,11 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
820854
manifest_writer.append(manifest_entry?)?;
821855
}
822856

823-
let manifest = manifest_writer.finish(object_store.clone()).await?;
857+
let (manifest, future) = manifest_writer.finish_concurrently(object_store.clone())?;
824858

825859
self.writer.append_ser(manifest)?;
826860

827-
Ok(())
861+
Ok(future)
828862
}
829863

830864
/// Appends data files by splitting them across multiple manifests and finalizes the manifest list.
@@ -875,15 +909,15 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
875909
/// object_store,
876910
/// ).await?;
877911
/// ```
878-
pub(crate) async fn append_multiple(
912+
pub(crate) async fn append_multiple_concurrently(
879913
&mut self,
880914
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
881915
snapshot_id: i64,
882916
n_splits: u32,
883917
object_store: Arc<dyn ObjectStore>,
884918
content: Content,
885-
) -> Result<(), Error> {
886-
self.append_multiple_filtered(
919+
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
920+
self.append_multiple_filtered_concurrently(
887921
data_files,
888922
snapshot_id,
889923
n_splits,
@@ -955,6 +989,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
955989
/// object_store,
956990
/// ).await?;
957991
/// ```
992+
#[inline]
958993
pub(crate) async fn append_multiple_filtered(
959994
&mut self,
960995
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
@@ -964,6 +999,28 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
964999
object_store: Arc<dyn ObjectStore>,
9651000
content: Content,
9661001
) -> Result<(), Error> {
1002+
self.append_multiple_filtered_concurrently(
1003+
data_files,
1004+
snapshot_id,
1005+
n_splits,
1006+
filter,
1007+
object_store,
1008+
content,
1009+
)
1010+
.await?
1011+
.await?;
1012+
Ok(())
1013+
}
1014+
1015+
pub(crate) async fn append_multiple_filtered_concurrently(
1016+
&mut self,
1017+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
1018+
snapshot_id: i64,
1019+
n_splits: u32,
1020+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
1021+
object_store: Arc<dyn ObjectStore>,
1022+
content: Content,
1023+
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
9671024
let partition_fields = self
9681025
.table_metadata
9691026
.current_partition_fields(self.branch.as_deref())?;
@@ -1032,7 +1089,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10321089
split_datafiles(data_files, bounds, &partition_column_names, n_splits)?
10331090
};
10341091

1035-
let manifest_futures = splits
1092+
let (manifests, manifest_futures) = splits
10361093
.into_iter()
10371094
.map(|entries| {
10381095
let manifest_location = self.next_manifest_location();
@@ -1050,17 +1107,17 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10501107
manifest_writer.append(manifest_entry)?;
10511108
}
10521109

1053-
Ok::<_, Error>(manifest_writer.finish(object_store.clone()))
1110+
manifest_writer.finish_concurrently(object_store.clone())
10541111
})
1055-
.collect::<Result<Vec<_>, _>>()?;
1056-
1057-
let manifests = futures::future::try_join_all(manifest_futures).await?;
1112+
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
10581113

10591114
for manifest in manifests {
10601115
self.writer.append_ser(manifest)?;
10611116
}
10621117

1063-
Ok(())
1118+
let future = futures::future::try_join_all(manifest_futures).map_ok(|_| ());
1119+
1120+
Ok(future)
10641121
}
10651122

10661123
pub(crate) async fn finish(

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
*/
44

55
use std::collections::HashSet;
6+
use std::future::Future;
7+
use std::pin::Pin;
68
use std::{collections::HashMap, sync::Arc};
79

810
use bytes::Bytes;
11+
use futures::future;
912
use iceberg_rust_spec::manifest_list::{
1013
manifest_list_schema_v1, manifest_list_schema_v2, Content, ManifestListEntry,
1114
};
@@ -327,6 +330,10 @@ impl Operation {
327330

328331
// Write manifest files
329332
// Split manifest file if limit is exceeded
333+
#[allow(clippy::type_complexity)]
334+
let mut futures: Vec<
335+
Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
336+
> = Vec::new();
330337
for (content, files, n_files) in [
331338
(Content::Data, Either::Left(new_datafile_iter), n_data_files),
332339
(
@@ -339,26 +346,37 @@ impl Operation {
339346
let n_splits = manifest_list_writer.n_splits(n_files, content);
340347

341348
if n_splits == 0 {
342-
manifest_list_writer
343-
.append(files, snapshot_id, object_store.clone(), content)
349+
let future = manifest_list_writer
350+
.append_concurrently(
351+
files,
352+
snapshot_id,
353+
object_store.clone(),
354+
content,
355+
)
344356
.await?;
357+
futures.push(Box::pin(future));
345358
} else {
346-
manifest_list_writer
347-
.append_multiple(
359+
let future = manifest_list_writer
360+
.append_multiple_concurrently(
348361
files,
349362
snapshot_id,
350363
n_splits,
351364
object_store.clone(),
352365
content,
353366
)
354367
.await?;
368+
futures.push(Box::pin(future));
355369
}
356370
}
357371
}
358372

359-
let new_manifest_list_location = manifest_list_writer
360-
.finish(snapshot_id, object_store)
361-
.await?;
373+
let manifest_future = future::try_join_all(futures);
374+
375+
let (_, new_manifest_list_location) = future::try_join(
376+
manifest_future,
377+
manifest_list_writer.finish(snapshot_id, object_store),
378+
)
379+
.await?;
362380

363381
let snapshot_operation = match (n_data_files, n_delete_files) {
364382
(0, 0) => return Ok((None, Vec::new())),

0 commit comments

Comments
 (0)