Skip to content

Commit 6ea7580

Browse files
committed
Fix equality delete test
Make asserts more explicit by using assert_batches_eq! and reload the table prior to applying equality deletes.
1 parent d7849b0 commit 6ea7580

File tree

1 file changed

+49
-65
lines changed

1 file changed

+49
-65
lines changed

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 49 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use std::sync::Arc;
2-
3-
use datafusion::{
4-
arrow::{array::Int64Array, error::ArrowError},
5-
prelude::SessionContext,
6-
};
1+
use datafusion::{arrow::error::ArrowError, assert_batches_eq, prelude::SessionContext};
72
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
83
use futures::stream;
4+
use iceberg_rust::catalog::identifier::Identifier;
5+
use iceberg_rust::catalog::tabular::Tabular;
96
use iceberg_rust::{
107
arrow::write::write_equality_deletes_parquet_partitioned,
118
catalog::Catalog,
@@ -18,6 +15,7 @@ use iceberg_rust::{
1815
table::Table,
1916
};
2017
use iceberg_sql_catalog::SqlCatalog;
18+
use std::sync::Arc;
2119

2220
#[tokio::test]
2321
pub async fn test_equality_delete() {
@@ -73,7 +71,7 @@ pub async fn test_equality_delete() {
7371
.build()
7472
.expect("Failed to create partition spec");
7573

76-
let mut table = Table::builder()
74+
let table = Table::builder()
7775
.with_name("orders")
7876
.with_location("/test/orders")
7977
.with_schema(schema)
@@ -84,7 +82,7 @@ pub async fn test_equality_delete() {
8482

8583
let ctx = SessionContext::new();
8684

87-
let datafusion_catalog = Arc::new(IcebergCatalog::new(catalog, None).await.unwrap());
85+
let datafusion_catalog = Arc::new(IcebergCatalog::new(catalog.clone(), None).await.unwrap());
8886

8987
ctx.register_catalog("warehouse", datafusion_catalog);
9088

@@ -104,51 +102,45 @@ pub async fn test_equality_delete() {
104102
.expect("Failed to insert values into table");
105103

106104
let batches = ctx
107-
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id;")
105+
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id")
108106
.await
109107
.expect("Failed to create plan for select")
110108
.collect()
111109
.await
112110
.expect("Failed to execute select query");
113111

114-
for batch in batches {
115-
if batch.num_rows() != 0 {
116-
let (product_ids, amounts) = (
117-
batch
118-
.column(0)
119-
.as_any()
120-
.downcast_ref::<Int64Array>()
121-
.unwrap(),
122-
batch
123-
.column(1)
124-
.as_any()
125-
.downcast_ref::<Int64Array>()
126-
.unwrap(),
127-
);
128-
for (product_id, amount) in product_ids.iter().zip(amounts) {
129-
if product_id.unwrap() == 1 {
130-
assert_eq!(amount.unwrap(), 7)
131-
} else if product_id.unwrap() == 2 {
132-
assert_eq!(amount.unwrap(), 1)
133-
} else if product_id.unwrap() == 3 {
134-
assert_eq!(amount.unwrap(), 3)
135-
} else {
136-
panic!("Unexpected order id")
137-
}
138-
}
139-
}
140-
}
112+
let expected = [
113+
"+------------+-----------------------------------+",
114+
"| product_id | sum(warehouse.test.orders.amount) |",
115+
"+------------+-----------------------------------+",
116+
"| 1 | 7 |",
117+
"| 2 | 1 |",
118+
"| 3 | 3 |",
119+
"+------------+-----------------------------------+",
120+
];
121+
assert_batches_eq!(expected, &batches);
141122

142123
let batches = ctx
143124
.sql(
144-
"SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1;",
125+
"SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1 order by id",
145126
)
146127
.await
147128
.expect("Failed to create query plan for insert")
148129
.collect()
149130
.await
150131
.expect("Failed to insert values into table");
151132

133+
let expected = [
134+
"+----+-------------+------------+------------+",
135+
"| id | customer_id | product_id | date |",
136+
"+----+-------------+------------+------------+",
137+
"| 1 | 1 | 1 | 2020-01-01 |",
138+
"| 4 | 1 | 2 | 2020-02-02 |",
139+
"| 5 | 1 | 1 | 2020-02-02 |",
140+
"+----+-------------+------------+------------+",
141+
];
142+
assert_batches_eq!(expected, &batches);
143+
152144
let files = write_equality_deletes_parquet_partitioned(
153145
&table,
154146
stream::iter(batches.into_iter().map(Ok::<_, ArrowError>)),
@@ -158,6 +150,16 @@ pub async fn test_equality_delete() {
158150
.await
159151
.unwrap();
160152

153+
// Load the latest table version, which includes the inserted rows
154+
let Tabular::Table(mut table) = catalog
155+
.clone()
156+
.load_tabular(&Identifier::new(&["test".to_string()], "orders"))
157+
.await
158+
.unwrap()
159+
else {
160+
panic!("Tabular should be a table");
161+
};
162+
161163
table
162164
.new_transaction(None)
163165
.append_delete(files)
@@ -166,38 +168,20 @@ pub async fn test_equality_delete() {
166168
.unwrap();
167169

168170
let batches = ctx
169-
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id;")
171+
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id")
170172
.await
171173
.expect("Failed to create plan for select")
172174
.collect()
173175
.await
174176
.expect("Failed to execute select query");
175177

176-
for batch in batches {
177-
if batch.num_rows() != 0 {
178-
let (product_ids, amounts) = (
179-
batch
180-
.column(0)
181-
.as_any()
182-
.downcast_ref::<Int64Array>()
183-
.unwrap(),
184-
batch
185-
.column(1)
186-
.as_any()
187-
.downcast_ref::<Int64Array>()
188-
.unwrap(),
189-
);
190-
for (product_id, amount) in product_ids.iter().zip(amounts) {
191-
if product_id.unwrap() == 1 {
192-
assert_eq!(amount.unwrap(), 4)
193-
} else if product_id.unwrap() == 2 {
194-
assert_eq!(amount.unwrap(), 0)
195-
} else if product_id.unwrap() == 3 {
196-
assert_eq!(amount.unwrap(), 3)
197-
} else {
198-
panic!("Unexpected order id")
199-
}
200-
}
201-
}
202-
}
178+
let expected = [
179+
"+------------+-----------------------------------+",
180+
"| product_id | sum(warehouse.test.orders.amount) |",
181+
"+------------+-----------------------------------+",
182+
"| 1 | 4 |",
183+
"| 3 | 3 |",
184+
"+------------+-----------------------------------+",
185+
];
186+
assert_batches_eq!(expected, &batches);
203187
}

0 commit comments

Comments
 (0)