Skip to content

Commit 558a81a

Browse files
author
Jan Kaul
committed
add tracing to writing parquet
1 parent d04a600 commit 558a81a

File tree

3 files changed

+8
-2
lines changed

3 files changed

+8
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iceberg-rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ smallvec = { version = "1.14.0", features = ["const_generics"] }
3232
sqlparser = { workspace = true }
3333
thiserror = { workspace = true }
3434
thrift = { version = "0.17.0", default-features = false }
35+
tracing = { workspace = true }
3536
tokio = { version = "1.43", features = ["sync"] }
3637
url = { workspace = true }
3738
uuid = { workspace = true }

iceberg-rust/src/arrow/write.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use object_store::{buffered::BufWriter, ObjectStore};
4040
use std::fmt::Write;
4141
use std::sync::Arc;
4242
use tokio::task::JoinSet;
43+
use tracing::instrument;
4344

4445
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
4546
use futures::Stream;
@@ -65,7 +66,7 @@ use super::partition::PartitionStream;
6566

6667
const MAX_PARQUET_SIZE: usize = 512_000_000;
6768

68-
#[inline]
69+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name()))]
6970
/// Writes Arrow record batches as partitioned Parquet files.
7071
///
7172
/// This function writes Arrow record batches to Parquet files, partitioning them according
@@ -94,7 +95,7 @@ pub async fn write_parquet_partitioned(
9495
store_parquet_partitioned(table, batches, branch, None).await
9596
}
9697

97-
#[inline]
98+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
9899
/// Writes equality delete records as partitioned Parquet files.
99100
///
100101
/// This function writes Arrow record batches containing equality delete records to Parquet files,
@@ -125,6 +126,7 @@ pub async fn write_equality_deletes_parquet_partitioned(
125126
store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
126127
}
127128

129+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
128130
/// Stores Arrow record batches as partitioned Parquet files.
129131
///
130132
/// This is an internal function that handles the core storage logic for both regular data files
@@ -268,6 +270,7 @@ async fn store_parquet_partitioned(
268270
type ArrowSender = Sender<(String, FileMetaData)>;
269271
type ArrowReciever = Receiver<(String, FileMetaData)>;
270272

273+
#[instrument(skip(batches, object_store), fields(data_location, equality_ids = ?equality_ids))]
271274
/// Writes a stream of Arrow record batches to multiple Parquet files.
272275
///
273276
/// This internal function handles the low-level details of writing record batches to Parquet files,
@@ -438,6 +441,7 @@ fn generate_partition_path(
438441
.collect::<Result<String, ArrowError>>()
439442
}
440443

444+
#[instrument(skip(schema, object_store), fields(data_location))]
441445
/// Creates a new Arrow writer for writing record batches to a Parquet file.
442446
///
443447
/// This internal function creates a new buffered writer and configures it with

0 commit comments

Comments
 (0)