Skip to content

Commit 0e46fe3

Browse files
authored
Merge pull request JanKaul#210 from JanKaul/add-tracing
Add tracing
2 parents d04a600 + 028acf7 commit 0e46fe3

File tree

4 files changed

+199
-2
lines changed

4 files changed

+199
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion_iceberg/src/table.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,197 @@ mod tests {
14791479
};
14801480
}
14811481

1482+
#[tokio::test]
1483+
pub async fn test_datafusion_table_insert_truncate_partitioned() {
1484+
let object_store = ObjectStoreBuilder::memory();
1485+
1486+
let catalog: Arc<dyn Catalog> = Arc::new(
1487+
SqlCatalog::new("sqlite://", "test", object_store)
1488+
.await
1489+
.unwrap(),
1490+
);
1491+
1492+
let schema = Schema::builder()
1493+
.with_struct_field(StructField {
1494+
id: 1,
1495+
name: "id".to_string(),
1496+
required: true,
1497+
field_type: Type::Primitive(PrimitiveType::Long),
1498+
doc: None,
1499+
})
1500+
.with_struct_field(StructField {
1501+
id: 2,
1502+
name: "customer_id".to_string(),
1503+
required: true,
1504+
field_type: Type::Primitive(PrimitiveType::Long),
1505+
doc: None,
1506+
})
1507+
.with_struct_field(StructField {
1508+
id: 3,
1509+
name: "product_id".to_string(),
1510+
required: true,
1511+
field_type: Type::Primitive(PrimitiveType::Long),
1512+
doc: None,
1513+
})
1514+
.with_struct_field(StructField {
1515+
id: 4,
1516+
name: "date".to_string(),
1517+
required: true,
1518+
field_type: Type::Primitive(PrimitiveType::Date),
1519+
doc: None,
1520+
})
1521+
.with_struct_field(StructField {
1522+
id: 5,
1523+
name: "amount".to_string(),
1524+
required: true,
1525+
field_type: Type::Primitive(PrimitiveType::Int),
1526+
doc: None,
1527+
})
1528+
.build()
1529+
.unwrap();
1530+
1531+
let partition_spec = PartitionSpec::builder()
1532+
.with_partition_field(PartitionField::new(
1533+
2,
1534+
1000,
1535+
"customer_id_truncate",
1536+
Transform::Truncate(2),
1537+
))
1538+
.build()
1539+
.expect("Failed to create partition spec");
1540+
1541+
let table = Table::builder()
1542+
.with_name("orders")
1543+
.with_location("/test/orders")
1544+
.with_schema(schema)
1545+
.with_partition_spec(partition_spec)
1546+
.build(&["test".to_owned()], catalog)
1547+
.await
1548+
.expect("Failed to create table");
1549+
1550+
let table = Arc::new(DataFusionTable::from(table));
1551+
1552+
let ctx = SessionContext::new();
1553+
1554+
ctx.register_table("orders", table.clone()).unwrap();
1555+
1556+
ctx.sql(
1557+
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
1558+
(1, 123, 1, '2020-01-01', 1),
1559+
(2, 234, 1, '2020-01-01', 1),
1560+
(3, 345, 1, '2020-01-01', 3),
1561+
(4, 123, 2, '2020-02-02', 1),
1562+
(5, 123, 1, '2020-02-02', 2),
1563+
(6, 345, 3, '2020-02-02', 3);",
1564+
)
1565+
.await
1566+
.expect("Failed to create query plan for insert")
1567+
.collect()
1568+
.await
1569+
.expect("Failed to insert values into table");
1570+
1571+
let batches = ctx
1572+
.sql("select product_id, sum(amount) from orders where customer_id = 123 group by product_id;")
1573+
.await
1574+
.expect("Failed to create plan for select")
1575+
.collect()
1576+
.await
1577+
.expect("Failed to execute select query");
1578+
1579+
for batch in batches {
1580+
if batch.num_rows() != 0 {
1581+
let (product_ids, amounts) = (
1582+
batch
1583+
.column(0)
1584+
.as_any()
1585+
.downcast_ref::<Int64Array>()
1586+
.unwrap(),
1587+
batch
1588+
.column(1)
1589+
.as_any()
1590+
.downcast_ref::<Int64Array>()
1591+
.unwrap(),
1592+
);
1593+
for (product_id, amount) in product_ids.iter().zip(amounts) {
1594+
if product_id.unwrap() == 1 {
1595+
assert_eq!(amount.unwrap(), 3)
1596+
} else if product_id.unwrap() == 2 {
1597+
assert_eq!(amount.unwrap(), 1)
1598+
} else if product_id.unwrap() == 3 {
1599+
assert_eq!(amount.unwrap(), 0)
1600+
} else {
1601+
panic!("Unexpected order id")
1602+
}
1603+
}
1604+
}
1605+
}
1606+
1607+
ctx.sql(
1608+
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
1609+
(7, 123, 3, '2020-01-03', 1),
1610+
(8, 234, 1, '2020-01-03', 2),
1611+
(9, 234, 2, '2020-01-03', 1),
1612+
(10, 123, 2, '2020-01-04', 3),
1613+
(11, 345, 1, '2020-01-04', 2),
1614+
(12, 234, 3, '2020-01-04', 1),
1615+
(13, 123, 1, '2020-01-05', 4),
1616+
(14, 345, 2, '2020-01-05', 2),
1617+
(15, 234, 3, '2020-01-05', 3),
1618+
(16, 234, 3, '2020-01-05', 3),
1619+
(17, 123, 3, '2020-01-06', 1),
1620+
(18, 234, 1, '2020-01-06', 2),
1621+
(19, 234, 2, '2020-01-06', 1),
1622+
(20, 123, 2, '2020-01-07', 3),
1623+
(21, 345, 1, '2020-01-07', 2),
1624+
(22, 234, 3, '2020-01-07', 1),
1625+
(23, 123, 1, '2020-01-08', 4),
1626+
(24, 345, 2, '2020-01-08', 2),
1627+
(25, 234, 3, '2020-01-08', 3);",
1628+
)
1629+
.await
1630+
.expect("Failed to create query plan for insert")
1631+
.collect()
1632+
.await
1633+
.expect("Failed to insert values into table");
1634+
1635+
let batches = ctx
1636+
.sql("select product_id, sum(amount) from orders where customer_id = 123 group by product_id;")
1637+
.await
1638+
.expect("Failed to create plan for select")
1639+
.collect()
1640+
.await
1641+
.expect("Failed to execute select query");
1642+
1643+
for batch in batches {
1644+
if batch.num_rows() != 0 {
1645+
let (product_ids, amounts) = (
1646+
batch
1647+
.column(0)
1648+
.as_any()
1649+
.downcast_ref::<Int64Array>()
1650+
.unwrap(),
1651+
batch
1652+
.column(1)
1653+
.as_any()
1654+
.downcast_ref::<Int64Array>()
1655+
.unwrap(),
1656+
);
1657+
for (product_id, amount) in product_ids.iter().zip(amounts) {
1658+
match product_id.unwrap() {
1659+
1 => assert_eq!(amount.unwrap(), 11),
1660+
2 => assert_eq!(amount.unwrap(), 7),
1661+
3 => assert_eq!(amount.unwrap(), 2),
1662+
_ => panic!("Unexpected order id"),
1663+
}
1664+
}
1665+
}
1666+
}
1667+
1668+
if let Tabular::Table(table) = table.tabular.read().await.deref() {
1669+
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
1670+
};
1671+
}
1672+
14821673
#[tokio::test]
14831674
pub async fn test_datafusion_table_branch_insert() {
14841675
let object_store = ObjectStoreBuilder::memory();

iceberg-rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ smallvec = { version = "1.14.0", features = ["const_generics"] }
3232
sqlparser = { workspace = true }
3333
thiserror = { workspace = true }
3434
thrift = { version = "0.17.0", default-features = false }
35+
tracing = { workspace = true }
3536
tokio = { version = "1.43", features = ["sync"] }
3637
url = { workspace = true }
3738
uuid = { workspace = true }

iceberg-rust/src/arrow/write.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use object_store::{buffered::BufWriter, ObjectStore};
4040
use std::fmt::Write;
4141
use std::sync::Arc;
4242
use tokio::task::JoinSet;
43+
use tracing::instrument;
4344

4445
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
4546
use futures::Stream;
@@ -65,7 +66,7 @@ use super::partition::PartitionStream;
6566

6667
const MAX_PARQUET_SIZE: usize = 512_000_000;
6768

68-
#[inline]
69+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name()))]
6970
/// Writes Arrow record batches as partitioned Parquet files.
7071
///
7172
/// This function writes Arrow record batches to Parquet files, partitioning them according
@@ -94,7 +95,7 @@ pub async fn write_parquet_partitioned(
9495
store_parquet_partitioned(table, batches, branch, None).await
9596
}
9697

97-
#[inline]
98+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
9899
/// Writes equality delete records as partitioned Parquet files.
99100
///
100101
/// This function writes Arrow record batches containing equality delete records to Parquet files,
@@ -125,6 +126,7 @@ pub async fn write_equality_deletes_parquet_partitioned(
125126
store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
126127
}
127128

129+
#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
128130
/// Stores Arrow record batches as partitioned Parquet files.
129131
///
130132
/// This is an internal function that handles the core storage logic for both regular data files
@@ -268,6 +270,7 @@ async fn store_parquet_partitioned(
268270
type ArrowSender = Sender<(String, FileMetaData)>;
269271
type ArrowReciever = Receiver<(String, FileMetaData)>;
270272

273+
#[instrument(skip(batches, object_store), fields(data_location, equality_ids = ?equality_ids))]
271274
/// Writes a stream of Arrow record batches to multiple Parquet files.
272275
///
273276
/// This internal function handles the low-level details of writing record batches to Parquet files,
@@ -438,6 +441,7 @@ fn generate_partition_path(
438441
.collect::<Result<String, ArrowError>>()
439442
}
440443

444+
#[instrument(skip(schema, object_store), fields(data_location))]
441445
/// Creates a new Arrow writer for writing record batches to a Parquet file.
442446
///
443447
/// This internal function creates a new buffered writer and configures it with

0 commit comments

Comments
 (0)