@@ -5,7 +5,7 @@ use arrow::array::{Int64Array, StringArray};
55use arrow:: datatypes:: { DataType , Field , Schema as ArrowSchema } ;
66use arrow:: record_batch:: RecordBatch ;
77use futures:: stream;
8- use futures:: { StreamExt , TryStreamExt } ;
8+ use futures:: StreamExt ;
99
1010use iceberg_rust:: arrow:: read:: read;
1111use iceberg_rust:: arrow:: write:: write_parquet_partitioned;
@@ -166,13 +166,14 @@ async fn test_table_transaction_overwrite() {
166166
167167 for manifest_entry in final_manifest_entries {
168168 let manifest_entries = vec ! [ manifest_entry] ;
169- let data_files = table
169+ let mut data_files = table
170170 . datafiles ( & manifest_entries, None , ( None , None ) )
171171 . await
172172 . expect ( "Failed to read data files" ) ;
173173
174174 data_files
175- . try_for_each ( |( _, entry) | {
175+ . try_for_each ( |result| {
176+ let ( _, entry) = result?;
176177 total_data_files += 1 ;
177178
178179 // Count files by partition
@@ -193,9 +194,8 @@ async fn test_table_transaction_overwrite() {
193194 }
194195 }
195196
196- futures :: future :: ready ( Ok ( ( ) ) )
197+ Ok :: < _ , Error > ( ( ) )
197198 } )
198- . await
199199 . expect ( "Failed to process data files" ) ;
200200 }
201201
@@ -219,17 +219,16 @@ async fn test_table_transaction_overwrite() {
219219 let mut all_manifest_entries = Vec :: new ( ) ;
220220 for manifest_entry in final_manifest_entries_for_read {
221221 let manifest_entries = vec ! [ manifest_entry] ;
222- let data_files = table
222+ let mut data_files = table
223223 . datafiles ( & manifest_entries, None , ( None , None ) )
224224 . await
225225 . expect ( "Failed to read data files for verification" ) ;
226226
227227 data_files
228- . try_for_each ( |( _ , entry ) | {
229- all_manifest_entries. push ( entry ) ;
230- futures :: future :: ready ( Ok ( ( ) ) )
228+ . try_for_each ( |result | {
229+ all_manifest_entries. push ( result? . 1 ) ;
230+ Ok :: < _ , Error > ( ( ) )
231231 } )
232- . await
233232 . expect ( "Failed to collect manifest entries" ) ;
234233 }
235234
@@ -409,36 +408,35 @@ async fn create_files_to_overwrite_for_partition(
409408
410409 // Read the data files from this manifest
411410 let manifest_entries = vec ! [ manifest_entry] ;
412- let data_files = table
411+ let mut data_files = table
413412 . datafiles ( & manifest_entries, None , ( None , None ) )
414413 . await ?;
415414
416415 let mut files_to_overwrite_in_manifest = Vec :: new ( ) ;
417416
418417 // Find files that match the target partition
419- data_files
420- . try_for_each ( | ( _, manifest_entry) | {
421- // Check if this file belongs to the target partition
422- let should_overwrite = manifest_entry
423- . data_file ( )
424- . partition ( )
425- . get ( "region" )
426- . and_then ( |v| v. as_ref ( ) )
427- . and_then ( |v| match v {
428- iceberg_rust_spec:: spec:: values:: Value :: String ( s) => Some ( s. as_str ( ) ) ,
429- _ => None ,
430- } )
431- . map ( |region| region == target_partition_value)
432- . unwrap_or ( false ) ;
433-
434- if should_overwrite {
435- files_to_overwrite_in_manifest
436- . push ( manifest_entry. data_file ( ) . file_path ( ) . to_owned ( ) ) ;
437- }
418+ data_files. try_for_each ( |result| {
419+ let ( _, manifest_entry) = result? ;
420+ // Check if this file belongs to the target partition
421+ let should_overwrite = manifest_entry
422+ . data_file ( )
423+ . partition ( )
424+ . get ( "region" )
425+ . and_then ( |v| v. as_ref ( ) )
426+ . and_then ( |v| match v {
427+ iceberg_rust_spec:: spec:: values:: Value :: String ( s) => Some ( s. as_str ( ) ) ,
428+ _ => None ,
429+ } )
430+ . map ( |region| region == target_partition_value)
431+ . unwrap_or ( false ) ;
432+
433+ if should_overwrite {
434+ files_to_overwrite_in_manifest
435+ . push ( manifest_entry. data_file ( ) . file_path ( ) . to_owned ( ) ) ;
436+ }
438437
439- futures:: future:: ready ( Ok ( ( ) ) )
440- } )
441- . await ?;
438+ Ok :: < _ , Error > ( ( ) )
439+ } ) ?;
442440
443441 // Add to the mapping if there are files to overwrite in this manifest
444442 if !files_to_overwrite_in_manifest. is_empty ( ) {
0 commit comments