33*/
44
55use std:: collections:: HashSet ;
6+ use std:: future:: Future ;
7+ use std:: pin:: Pin ;
68use std:: { collections:: HashMap , sync:: Arc } ;
79
810use bytes:: Bytes ;
11+ use futures:: future;
912use iceberg_rust_spec:: manifest_list:: {
1013 manifest_list_schema_v1, manifest_list_schema_v2, Content , ManifestListEntry ,
1114} ;
@@ -327,6 +330,10 @@ impl Operation {
327330
328331 // Write manifest files
329332 // Split manifest file if limit is exceeded
333+ #[ allow( clippy:: type_complexity) ]
334+ let mut futures: Vec <
335+ Pin < Box < dyn Future < Output = Result < ( ) , Error > > + Send > > ,
336+ > = Vec :: new ( ) ;
330337 for ( content, files, n_files) in [
331338 ( Content :: Data , Either :: Left ( new_datafile_iter) , n_data_files) ,
332339 (
@@ -339,26 +346,37 @@ impl Operation {
339346 let n_splits = manifest_list_writer. n_splits ( n_files, content) ;
340347
341348 if n_splits == 0 {
342- manifest_list_writer
343- . append ( files, snapshot_id, object_store. clone ( ) , content)
349+ let future = manifest_list_writer
350+ . append_concurrently (
351+ files,
352+ snapshot_id,
353+ object_store. clone ( ) ,
354+ content,
355+ )
344356 . await ?;
357+ futures. push ( Box :: pin ( future) ) ;
345358 } else {
346- manifest_list_writer
347- . append_multiple (
359+ let future = manifest_list_writer
360+ . append_multiple_concurrently (
348361 files,
349362 snapshot_id,
350363 n_splits,
351364 object_store. clone ( ) ,
352365 content,
353366 )
354367 . await ?;
368+ futures. push ( Box :: pin ( future) ) ;
355369 }
356370 }
357371 }
358372
359- let new_manifest_list_location = manifest_list_writer
360- . finish ( snapshot_id, object_store)
361- . await ?;
373+ let manifest_future = future:: try_join_all ( futures) ;
374+
375+ let ( _, new_manifest_list_location) = future:: try_join (
376+ manifest_future,
377+ manifest_list_writer. finish ( snapshot_id, object_store) ,
378+ )
379+ . await ?;
362380
363381 let snapshot_operation = match ( n_data_files, n_delete_files) {
364382 ( 0 , 0 ) => return Ok ( ( None , Vec :: new ( ) ) ) ,
0 commit comments