1- use std:: sync:: Arc ;
2-
3- use datafusion:: {
4- arrow:: { array:: Int64Array , error:: ArrowError } ,
5- prelude:: SessionContext ,
6- } ;
1+ use datafusion:: { arrow:: error:: ArrowError , assert_batches_eq, prelude:: SessionContext } ;
72use datafusion_iceberg:: catalog:: catalog:: IcebergCatalog ;
83use futures:: stream;
4+ use iceberg_rust:: catalog:: identifier:: Identifier ;
5+ use iceberg_rust:: catalog:: tabular:: Tabular ;
96use iceberg_rust:: {
107 arrow:: write:: write_equality_deletes_parquet_partitioned,
118 catalog:: Catalog ,
@@ -18,6 +15,7 @@ use iceberg_rust::{
1815 table:: Table ,
1916} ;
2017use iceberg_sql_catalog:: SqlCatalog ;
18+ use std:: sync:: Arc ;
2119
2220#[ tokio:: test]
2321pub async fn test_equality_delete ( ) {
@@ -73,7 +71,7 @@ pub async fn test_equality_delete() {
7371 . build ( )
7472 . expect ( "Failed to create partition spec" ) ;
7573
76- let mut table = Table :: builder ( )
74+ let table = Table :: builder ( )
7775 . with_name ( "orders" )
7876 . with_location ( "/test/orders" )
7977 . with_schema ( schema)
@@ -84,7 +82,7 @@ pub async fn test_equality_delete() {
8482
8583 let ctx = SessionContext :: new ( ) ;
8684
87- let datafusion_catalog = Arc :: new ( IcebergCatalog :: new ( catalog, None ) . await . unwrap ( ) ) ;
85+ let datafusion_catalog = Arc :: new ( IcebergCatalog :: new ( catalog. clone ( ) , None ) . await . unwrap ( ) ) ;
8886
8987 ctx. register_catalog ( "warehouse" , datafusion_catalog) ;
9088
@@ -104,51 +102,45 @@ pub async fn test_equality_delete() {
104102 . expect ( "Failed to insert values into table" ) ;
105103
106104 let batches = ctx
107- . sql ( "select product_id, sum(amount) from warehouse.test.orders group by product_id; " )
105+ . sql ( "select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id " )
108106 . await
109107 . expect ( "Failed to create plan for select" )
110108 . collect ( )
111109 . await
112110 . expect ( "Failed to execute select query" ) ;
113111
114- for batch in batches {
115- if batch. num_rows ( ) != 0 {
116- let ( product_ids, amounts) = (
117- batch
118- . column ( 0 )
119- . as_any ( )
120- . downcast_ref :: < Int64Array > ( )
121- . unwrap ( ) ,
122- batch
123- . column ( 1 )
124- . as_any ( )
125- . downcast_ref :: < Int64Array > ( )
126- . unwrap ( ) ,
127- ) ;
128- for ( product_id, amount) in product_ids. iter ( ) . zip ( amounts) {
129- if product_id. unwrap ( ) == 1 {
130- assert_eq ! ( amount. unwrap( ) , 7 )
131- } else if product_id. unwrap ( ) == 2 {
132- assert_eq ! ( amount. unwrap( ) , 1 )
133- } else if product_id. unwrap ( ) == 3 {
134- assert_eq ! ( amount. unwrap( ) , 3 )
135- } else {
136- panic ! ( "Unexpected order id" )
137- }
138- }
139- }
140- }
112+ let expected = [
113+ "+------------+-----------------------------------+" ,
114+ "| product_id | sum(warehouse.test.orders.amount) |" ,
115+ "+------------+-----------------------------------+" ,
116+ "| 1 | 7 |" ,
117+ "| 2 | 1 |" ,
118+ "| 3 | 3 |" ,
119+ "+------------+-----------------------------------+" ,
120+ ] ;
121+ assert_batches_eq ! ( expected, & batches) ;
141122
142123 let batches = ctx
143124 . sql (
144- "SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1; " ,
125+ "SELECT id, customer_id, product_id, date FROM warehouse.test.orders WHERE customer_id = 1 order by id " ,
145126 )
146127 . await
147128 . expect ( "Failed to create query plan for insert" )
148129 . collect ( )
149130 . await
150131 . expect ( "Failed to insert values into table" ) ;
151132
133+ let expected = [
134+ "+----+-------------+------------+------------+" ,
135+ "| id | customer_id | product_id | date |" ,
136+ "+----+-------------+------------+------------+" ,
137+ "| 1 | 1 | 1 | 2020-01-01 |" ,
138+ "| 4 | 1 | 2 | 2020-02-02 |" ,
139+ "| 5 | 1 | 1 | 2020-02-02 |" ,
140+ "+----+-------------+------------+------------+" ,
141+ ] ;
142+ assert_batches_eq ! ( expected, & batches) ;
143+
152144 let files = write_equality_deletes_parquet_partitioned (
153145 & table,
154146 stream:: iter ( batches. into_iter ( ) . map ( Ok :: < _ , ArrowError > ) ) ,
@@ -158,6 +150,16 @@ pub async fn test_equality_delete() {
158150 . await
159151 . unwrap ( ) ;
160152
153+ // Load the latest table version, which includes the inserted rows
154+ let Tabular :: Table ( mut table) = catalog
155+ . clone ( )
156+ . load_tabular ( & Identifier :: new ( & [ "test" . to_string ( ) ] , "orders" ) )
157+ . await
158+ . unwrap ( )
159+ else {
160+ panic ! ( "Tabular should be a table" ) ;
161+ } ;
162+
161163 table
162164 . new_transaction ( None )
163165 . append_delete ( files)
@@ -166,38 +168,20 @@ pub async fn test_equality_delete() {
166168 . unwrap ( ) ;
167169
168170 let batches = ctx
169- . sql ( "select product_id, sum(amount) from warehouse.test.orders group by product_id; " )
171+ . sql ( "select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id " )
170172 . await
171173 . expect ( "Failed to create plan for select" )
172174 . collect ( )
173175 . await
174176 . expect ( "Failed to execute select query" ) ;
175177
176- for batch in batches {
177- if batch. num_rows ( ) != 0 {
178- let ( product_ids, amounts) = (
179- batch
180- . column ( 0 )
181- . as_any ( )
182- . downcast_ref :: < Int64Array > ( )
183- . unwrap ( ) ,
184- batch
185- . column ( 1 )
186- . as_any ( )
187- . downcast_ref :: < Int64Array > ( )
188- . unwrap ( ) ,
189- ) ;
190- for ( product_id, amount) in product_ids. iter ( ) . zip ( amounts) {
191- if product_id. unwrap ( ) == 1 {
192- assert_eq ! ( amount. unwrap( ) , 4 )
193- } else if product_id. unwrap ( ) == 2 {
194- assert_eq ! ( amount. unwrap( ) , 0 )
195- } else if product_id. unwrap ( ) == 3 {
196- assert_eq ! ( amount. unwrap( ) , 3 )
197- } else {
198- panic ! ( "Unexpected order id" )
199- }
200- }
201- }
202- }
178+ let expected = [
179+ "+------------+-----------------------------------+" ,
180+ "| product_id | sum(warehouse.test.orders.amount) |" ,
181+ "+------------+-----------------------------------+" ,
182+ "| 1 | 4 |" ,
183+ "| 3 | 3 |" ,
184+ "+------------+-----------------------------------+" ,
185+ ] ;
186+ assert_batches_eq ! ( expected, & batches) ;
203187}
0 commit comments