File tree Expand file tree Collapse file tree 1 file changed +2
-4
lines changed
Expand file tree Collapse file tree 1 file changed +2
-4
lines changed Original file line number Diff line number Diff line change @@ -65,6 +65,7 @@ use crate::{
6565use super :: partition:: PartitionStream ;
6666
6767const MAX_PARQUET_SIZE : usize = 512_000_000 ;
68+ const COMPRESSION_FACTOR : usize = 200 ;
6869
6970#[ instrument( skip( table, batches) , fields( table_name = %table. identifier( ) . name( ) ) ) ]
7071/// Writes Arrow record batches as partitioned Parquet files.
@@ -341,7 +342,7 @@ async fn write_parquet_files(
341342 let batch_size = record_batch_size ( & batch) ;
342343 let new_size = state. bytes_written + batch_size;
343344
344- if new_size > MAX_PARQUET_SIZE {
345+ if new_size > COMPRESSION_FACTOR * MAX_PARQUET_SIZE {
345346 // Send current writer to channel
346347 let finished_writer = state. writer ;
347348 let file = finished_writer. 1 . close ( ) . await ?;
@@ -362,9 +363,6 @@ async fn write_parquet_files(
362363 state. bytes_written = batch_size;
363364 } else {
364365 state. bytes_written = new_size;
365- if new_size % 64_000_000 >= 32_000_000 {
366- state. writer . 1 . flush ( ) . await ?;
367- }
368366 }
369367
370368 state. writer . 1 . write ( & batch) . await ?;
You can’t perform that action at this time.
0 commit comments