Skip to content

Commit d7faf0b

Browse files
committed
create concurrent methods for manifest_list
1 parent 89a810e commit d7faf0b

File tree

1 file changed

+54
-14
lines changed

1 file changed

+54
-14
lines changed

iceberg-rust/src/table/manifest_list.rs

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -701,18 +701,22 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
701701
.await
702702
}
703703

704-
pub(crate) async fn append_filtered(
704+
#[inline]
705+
pub(crate) async fn append_concurrently(
705706
&mut self,
706707
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
707708
snapshot_id: i64,
708-
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
709709
object_store: Arc<dyn ObjectStore>,
710710
content: Content,
711-
) -> Result<(), Error> {
712-
self.append_filtered_concurrently(data_files, snapshot_id, filter, object_store, content)
713-
.await?
714-
.await?;
715-
Ok(())
711+
) -> Result<impl Future<Output = Result<PutResult, Error>>, Error> {
712+
self.append_filtered_concurrently(
713+
data_files,
714+
snapshot_id,
715+
None::<fn(&Result<ManifestEntry, Error>) -> bool>,
716+
object_store,
717+
content,
718+
)
719+
.await
716720
}
717721

718722
/// Appends data files to a single manifest with optional filtering and finalizes the manifest list.
@@ -769,6 +773,21 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
769773
/// object_store,
770774
/// ).await?;
771775
/// ```
776+
#[inline]
777+
pub(crate) async fn append_filtered(
778+
&mut self,
779+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
780+
snapshot_id: i64,
781+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
782+
object_store: Arc<dyn ObjectStore>,
783+
content: Content,
784+
) -> Result<(), Error> {
785+
self.append_filtered_concurrently(data_files, snapshot_id, filter, object_store, content)
786+
.await?
787+
.await?;
788+
Ok(())
789+
}
790+
772791
pub(crate) async fn append_filtered_concurrently(
773792
&mut self,
774793
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
@@ -909,27 +928,25 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
909928
.await
910929
}
911930

912-
pub(crate) async fn append_multiple_filtered(
931+
pub(crate) async fn append_multiple_concurrently(
913932
&mut self,
914933
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
915934
snapshot_id: i64,
916935
n_splits: u32,
917-
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
918936
object_store: Arc<dyn ObjectStore>,
919937
content: Content,
920-
) -> Result<(), Error> {
938+
) -> Result<impl Future<Output = Result<Vec<PutResult>, Error>>, Error> {
921939
self.append_multiple_filtered_concurrently(
922940
data_files,
923941
snapshot_id,
924942
n_splits,
925-
filter,
943+
None::<fn(&Result<ManifestEntry, Error>) -> bool>,
926944
object_store,
927945
content,
928946
)
929-
.await?
930-
.await?;
931-
Ok(())
947+
.await
932948
}
949+
933950
/// Appends data files across multiple manifests with optional filtering and finalizes the manifest list.
934951
///
935952
/// This method extends the `append_multiple` functionality by providing the ability to
@@ -991,6 +1008,29 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
9911008
/// object_store,
9921009
/// ).await?;
9931010
/// ```
1011+
#[inline]
1012+
pub(crate) async fn append_multiple_filtered(
1013+
&mut self,
1014+
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
1015+
snapshot_id: i64,
1016+
n_splits: u32,
1017+
filter: Option<impl Fn(&Result<ManifestEntry, Error>) -> bool>,
1018+
object_store: Arc<dyn ObjectStore>,
1019+
content: Content,
1020+
) -> Result<(), Error> {
1021+
self.append_multiple_filtered_concurrently(
1022+
data_files,
1023+
snapshot_id,
1024+
n_splits,
1025+
filter,
1026+
object_store,
1027+
content,
1028+
)
1029+
.await?
1030+
.await?;
1031+
Ok(())
1032+
}
1033+
9941034
pub(crate) async fn append_multiple_filtered_concurrently(
9951035
&mut self,
9961036
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,

0 commit comments

Comments
 (0)