Skip to content

Commit 89a810e

Browse files
committed
add concurrent methods to write manifests
1 parent 7807ad8 commit 89a810e

File tree

2 files changed

+100
-14
lines changed

2 files changed

+100
-14
lines changed

iceberg-rust/src/table/manifest.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
use std::{
1919
collections::HashSet,
20+
future::Future,
2021
io::Read,
2122
iter::{repeat, Map, Repeat, Zip},
2223
sync::Arc,
@@ -26,6 +27,7 @@ use apache_avro::{
2627
to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
2728
Writer as AvroWriter,
2829
};
30+
use futures::TryFutureExt;
2931
use iceberg_rust_spec::{
3032
manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
3133
manifest_list::{self, FieldSummary, ManifestListEntry},
@@ -35,7 +37,7 @@ use iceberg_rust_spec::{
3537
util::strip_prefix,
3638
values::{Struct, Value},
3739
};
38-
use object_store::ObjectStore;
40+
use object_store::{ObjectStore, PutResult};
3941

4042
use crate::error::Error;
4143

@@ -636,6 +638,54 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
636638
.await?;
637639
Ok(self.manifest)
638640
}
641+
642+
/// Finishes writing the manifest file concurrently.
643+
///
644+
/// This method completes the manifest writing process by finalizing the writer
645+
/// and returning both the manifest list entry and a future for the actual file upload.
646+
/// The upload operation can be awaited separately, allowing for concurrent processing
647+
/// of multiple manifest writes.
648+
///
649+
/// # Arguments
650+
///
651+
/// * `object_store` - The object store implementation used to persist the manifest file
652+
///
653+
/// # Returns
654+
///
655+
/// Returns a tuple containing:
656+
/// - `ManifestListEntry`: The completed manifest entry with updated metadata
657+
/// - `impl Future<Output = Result<PutResult, Error>>`: A future that performs the actual file upload
658+
///
659+
/// # Errors
660+
///
661+
/// Returns an error if:
662+
/// - The writer cannot be finalized
663+
/// - There are issues preparing the upload operation
664+
pub(crate) fn finish_concurrently(
665+
mut self,
666+
object_store: Arc<dyn ObjectStore>,
667+
) -> Result<
668+
(
669+
ManifestListEntry,
670+
impl Future<Output = Result<PutResult, Error>>,
671+
),
672+
Error,
673+
> {
674+
let manifest_bytes = self.writer.into_inner()?;
675+
676+
let manifest_length: i64 = manifest_bytes.len() as i64;
677+
678+
self.manifest.manifest_length += manifest_length;
679+
680+
let path = strip_prefix(&self.manifest.manifest_path).as_str().into();
681+
let future = async move {
682+
object_store
683+
.put(&path, manifest_bytes.into())
684+
.map_err(Error::from)
685+
.await
686+
};
687+
Ok((self.manifest, future))
688+
}
639689
}
640690

641691
#[allow(clippy::type_complexity)]

iceberg-rust/src/table/manifest_list.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use std::{
66
collections::{HashMap, HashSet},
7+
future::Future,
78
io::{Cursor, Read},
89
iter::{repeat, Map, Repeat, Zip},
910
sync::Arc,
@@ -23,7 +24,7 @@ use iceberg_rust_spec::{
2324
table_metadata::{FormatVersion, TableMetadata},
2425
util::strip_prefix,
2526
};
26-
use object_store::ObjectStore;
27+
use object_store::{ObjectStore, PutResult};
2728
use smallvec::SmallVec;
2829

2930
use crate::{
@@ -700,6 +701,20 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
700701
.await
701702
}
702703

704+
pub(crate) async fn append_filtered(
705+
&mut self,
706+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
707+
snapshot_id: i64,
708+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
709+
object_store: Arc<dyn ObjectStore>,
710+
content: Content,
711+
) -> Result<(), Error> {
712+
self.append_filtered_concurrently(data_files, snapshot_id, filter, object_store, content)
713+
.await?
714+
.await?;
715+
Ok(())
716+
}
717+
703718
/// Appends data files to a single manifest with optional filtering and finalizes the manifest list.
704719
///
705720
/// This method extends the basic `append` functionality by providing the ability to
@@ -754,14 +769,14 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
754769
/// object_store,
755770
/// ).await?;
756771
/// ```
757-
pub(crate) async fn append_filtered(
772+
pub(crate) async fn append_filtered_concurrently(
758773
&mut self,
759774
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
760775
snapshot_id: i64,
761776
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
762777
object_store: Arc<dyn ObjectStore>,
763778
content: Content,
764-
) -> Result<(), Error> {
779+
) -> Result<impl Future<Output = Result<PutResult, Error>>, Error> {
765780
let selected_manifest = match content {
766781
Content::Data => self.selected_data_manifest.take(),
767782
Content::Deletes => self.selected_delete_manifest.take(),
@@ -820,11 +835,11 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
820835
manifest_writer.append(manifest_entry?)?;
821836
}
822837

823-
let manifest = manifest_writer.finish(object_store.clone()).await?;
838+
let (manifest, future) = manifest_writer.finish_concurrently(object_store.clone())?;
824839

825840
self.writer.append_ser(manifest)?;
826841

827-
Ok(())
842+
Ok(future)
828843
}
829844

830845
/// Appends data files by splitting them across multiple manifests and finalizes the manifest list.
@@ -894,6 +909,27 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
894909
.await
895910
}
896911

912+
pub(crate) async fn append_multiple_filtered(
913+
&mut self,
914+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
915+
snapshot_id: i64,
916+
n_splits: u32,
917+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
918+
object_store: Arc<dyn ObjectStore>,
919+
content: Content,
920+
) -> Result<(), Error> {
921+
self.append_multiple_filtered_concurrently(
922+
data_files,
923+
snapshot_id,
924+
n_splits,
925+
filter,
926+
object_store,
927+
content,
928+
)
929+
.await?
930+
.await?;
931+
Ok(())
932+
}
897933
/// Appends data files across multiple manifests with optional filtering and finalizes the manifest list.
898934
///
899935
/// This method extends the `append_multiple` functionality by providing the ability to
@@ -955,15 +991,15 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
955991
/// object_store,
956992
/// ).await?;
957993
/// ```
958-
pub(crate) async fn append_multiple_filtered(
994+
pub(crate) async fn append_multiple_filtered_concurrently(
959995
&mut self,
960996
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
961997
snapshot_id: i64,
962998
n_splits: u32,
963999
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
9641000
object_store: Arc<dyn ObjectStore>,
9651001
content: Content,
966-
) -> Result<(), Error> {
1002+
) -> Result<impl Future<Output = Result<Vec<PutResult>, Error>>, Error> {
9671003
let partition_fields = self
9681004
.table_metadata
9691005
.current_partition_fields(self.branch.as_deref())?;
@@ -1032,7 +1068,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10321068
split_datafiles(data_files, bounds, &partition_column_names, n_splits)?
10331069
};
10341070

1035-
let manifest_futures = splits
1071+
let (manifests, manifest_futures) = splits
10361072
.into_iter()
10371073
.map(|entries| {
10381074
let manifest_location = self.next_manifest_location();
@@ -1050,17 +1086,17 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10501086
manifest_writer.append(manifest_entry)?;
10511087
}
10521088

1053-
Ok::<_, Error>(manifest_writer.finish(object_store.clone()))
1089+
manifest_writer.finish_concurrently(object_store.clone())
10541090
})
1055-
.collect::<Result<Vec<_>, _>>()?;
1056-
1057-
let manifests = futures::future::try_join_all(manifest_futures).await?;
1091+
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
10581092

10591093
for manifest in manifests {
10601094
self.writer.append_ser(manifest)?;
10611095
}
10621096

1063-
Ok(())
1097+
let future = futures::future::try_join_all(manifest_futures);
1098+
1099+
Ok(future)
10641100
}
10651101

10661102
pub(crate) async fn finish(

0 commit comments

Comments
 (0)