@@ -13,6 +13,7 @@ use std::{
1313 hash:: Hash ,
1414 pin:: Pin ,
1515 task:: { Context , Poll } ,
16+ thread:: available_parallelism,
1617} ;
1718
1819use arrow:: {
@@ -33,7 +34,7 @@ use futures::{
3334use itertools:: { iproduct, Itertools } ;
3435
3536use iceberg_rust_spec:: { partition:: BoundPartitionField , spec:: values:: Value } ;
36- use once_map :: OnceMap ;
37+ use lru :: LruCache ;
3738use pin_project_lite:: pin_project;
3839
3940use crate :: error:: Error ;
@@ -58,7 +59,7 @@ pin_project! {
5859 #[ pin]
5960 record_batches: Pin <Box <dyn Stream <Item = Result <RecordBatch , ArrowError >> + Send >>,
6061 partition_fields: & ' a [ BoundPartitionField <' a>] ,
61- partition_streams: OnceMap <Vec <Value >, RecordBatchSender >,
62+ partition_streams: LruCache <Vec <Value >, RecordBatchSender >,
6263 queue: VecDeque <Result <( Vec <Value >, RecordBatchReceiver ) , Error >>,
6364 sends: Vec <( RecordBatchSender , RecordBatch ) >,
6465 }
@@ -72,7 +73,7 @@ impl<'a> PartitionStream<'a> {
7273 Self {
7374 record_batches,
7475 partition_fields,
75- partition_streams : OnceMap :: new ( ) ,
76+ partition_streams : LruCache :: unbounded ( ) ,
7677 queue : VecDeque :: new ( ) ,
7778 sends : Vec :: new ( ) ,
7879 }
@@ -109,13 +110,22 @@ impl Stream for PartitionStream<'_> {
109110 }
110111 }
111112
113+ // Limit the number of open partition_streams by available parallelism
114+ if this. partition_streams . len ( ) > available_parallelism ( ) . unwrap ( ) . get ( ) {
115+ if let Some ( ( _, mut sender) ) = this. partition_streams . pop_lru ( ) {
116+ sender. close_channel ( ) ;
117+ }
118+ }
119+
112120 match this. record_batches . as_mut ( ) . poll_next ( cx) {
113121 Poll :: Pending => {
114122 break Poll :: Pending ;
115123 }
116124 Poll :: Ready ( None ) => {
117- for sender in this. partition_streams . read_only_view ( ) . values ( ) {
118- sender. clone ( ) . close_channel ( ) ;
125+ while let Some ( ( _, sender) ) = this. partition_streams . pop_lru ( ) {
126+ if !sender. is_closed ( ) {
127+ sender. clone ( ) . close_channel ( ) ;
128+ }
119129 }
120130 break Poll :: Ready ( None ) ;
121131 }
@@ -127,16 +137,16 @@ impl Stream for PartitionStream<'_> {
127137 let ( partition_values, batch) = result?;
128138
129139 let sender = if let Some ( sender) =
130- this. partition_streams . get_cloned ( & partition_values)
140+ this. partition_streams . get ( & partition_values) . cloned ( )
131141 {
132142 sender
133143 } else {
144+ let ( sender, reciever) = channel ( 1 ) ;
145+ this. queue
146+ . push_back ( Ok ( ( partition_values. clone ( ) , reciever) ) ) ;
134147 this. partition_streams
135- . insert_cloned ( partition_values, |key| {
136- let ( sender, reciever) = channel ( 1 ) ;
137- this. queue . push_back ( Ok ( ( key. clone ( ) , reciever) ) ) ;
138- sender
139- } )
148+ . push ( partition_values, sender. clone ( ) ) ;
149+ sender
140150 } ;
141151
142152 this. sends . push ( ( sender, batch) ) ;
0 commit comments