Skip to content

Commit ab6904f

Browse files
authored
Merge pull request JanKaul#221 from JanKaul/fix-partitioned-write
Fix partitioned write
2 parents 92b2774 + 8d593aa commit ab6904f

File tree

4 files changed

+237
-294
lines changed

4 files changed

+237
-294
lines changed

datafusion_iceberg/src/materialized_view/delta_queries/fork_node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub fn fork_node(plan: Arc<LogicalPlan>) -> (ForkNode, ForkNode) {
3434
let parallelism = std::thread::available_parallelism().unwrap().get();
3535
let (sender, receiver): (Vec<_>, Vec<_>) = iter::repeat_n((), parallelism)
3636
.map(|_| {
37-
let (sender, receiver) = channel(1);
37+
let (sender, receiver) = channel(0);
3838
(
3939
Arc::new(Mutex::new(Some(sender))),
4040
Arc::new(Mutex::new(Some(receiver))),
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
arrow::array::{Float64Array, Int64Array},
5+
common::tree_node::{TransformedResult, TreeNode},
6+
execution::{context::SessionContext, SessionStateBuilder},
7+
};
8+
use datafusion_expr::ScalarUDF;
9+
use iceberg_rust::object_store::ObjectStoreBuilder;
10+
use iceberg_sql_catalog::SqlCatalogList;
11+
12+
use datafusion_iceberg::{
13+
catalog::catalog_list::IcebergCatalogList,
14+
planner::{iceberg_transform, IcebergQueryPlanner, RefreshMaterializedView},
15+
};
16+
17+
#[tokio::test]
18+
async fn test_insert_csv() {
19+
let object_store = ObjectStoreBuilder::memory();
20+
let iceberg_catalog_list = Arc::new(
21+
SqlCatalogList::new("sqlite://", object_store)
22+
.await
23+
.unwrap(),
24+
);
25+
26+
let catalog_list = {
27+
Arc::new(
28+
IcebergCatalogList::new(iceberg_catalog_list.clone())
29+
.await
30+
.unwrap(),
31+
)
32+
};
33+
34+
let state = SessionStateBuilder::new()
35+
.with_default_features()
36+
.with_catalog_list(catalog_list)
37+
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
38+
.build();
39+
40+
let ctx = SessionContext::new_with_state(state);
41+
42+
ctx.register_udf(ScalarUDF::from(RefreshMaterializedView::new(
43+
iceberg_catalog_list,
44+
)));
45+
46+
let sql = "CREATE EXTERNAL TABLE lineitem (
47+
L_ORDERKEY BIGINT NOT NULL,
48+
L_PARTKEY BIGINT NOT NULL,
49+
L_SUPPKEY BIGINT NOT NULL,
50+
L_LINENUMBER INT NOT NULL,
51+
L_QUANTITY DOUBLE NOT NULL,
52+
L_EXTENDED_PRICE DOUBLE NOT NULL,
53+
L_DISCOUNT DOUBLE NOT NULL,
54+
L_TAX DOUBLE NOT NULL,
55+
L_RETURNFLAG CHAR NOT NULL,
56+
L_LINESTATUS CHAR NOT NULL,
57+
L_SHIPDATE DATE NOT NULL,
58+
L_COMMITDATE DATE NOT NULL,
59+
L_RECEIPTDATE DATE NOT NULL,
60+
L_SHIPINSTRUCT VARCHAR NOT NULL,
61+
L_SHIPMODE VARCHAR NOT NULL,
62+
L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION 'testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
63+
64+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
65+
66+
let transformed = plan.transform(iceberg_transform).data().unwrap();
67+
68+
ctx.execute_logical_plan(transformed)
69+
.await
70+
.unwrap()
71+
.collect()
72+
.await
73+
.expect("Failed to execute query plan.");
74+
75+
let sql = "CREATE SCHEMA warehouse.tpch;";
76+
77+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
78+
79+
let transformed = plan.transform(iceberg_transform).data().unwrap();
80+
81+
ctx.execute_logical_plan(transformed)
82+
.await
83+
.unwrap()
84+
.collect()
85+
.await
86+
.expect("Failed to execute query plan.");
87+
88+
let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
89+
L_ORDERKEY BIGINT NOT NULL,
90+
L_PARTKEY BIGINT NOT NULL,
91+
L_SUPPKEY BIGINT NOT NULL,
92+
L_LINENUMBER INT NOT NULL,
93+
L_QUANTITY DOUBLE NOT NULL,
94+
L_EXTENDED_PRICE DOUBLE NOT NULL,
95+
L_DISCOUNT DOUBLE NOT NULL,
96+
L_TAX DOUBLE NOT NULL,
97+
L_RETURNFLAG CHAR NOT NULL,
98+
L_LINESTATUS CHAR NOT NULL,
99+
L_SHIPDATE DATE NOT NULL,
100+
L_COMMITDATE DATE NOT NULL,
101+
L_RECEIPTDATE DATE NOT NULL,
102+
L_SHIPINSTRUCT VARCHAR NOT NULL,
103+
L_SHIPMODE VARCHAR NOT NULL,
104+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION '/warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
105+
106+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
107+
108+
let transformed = plan.transform(iceberg_transform).data().unwrap();
109+
110+
ctx.execute_logical_plan(transformed)
111+
.await
112+
.unwrap()
113+
.collect()
114+
.await
115+
.expect("Failed to execute query plan.");
116+
117+
let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
118+
119+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
120+
121+
let transformed = plan.transform(iceberg_transform).data().unwrap();
122+
123+
ctx.execute_logical_plan(transformed)
124+
.await
125+
.unwrap()
126+
.collect()
127+
.await
128+
.expect("Failed to execute query plan.");
129+
130+
let batches = ctx
131+
.sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
132+
.await
133+
.expect("Failed to create plan for select")
134+
.collect()
135+
.await
136+
.expect("Failed to execute select query");
137+
138+
let mut once = false;
139+
140+
for batch in batches {
141+
if batch.num_rows() != 0 {
142+
let (amounts, product_ids) = (
143+
batch
144+
.column(0)
145+
.as_any()
146+
.downcast_ref::<Float64Array>()
147+
.unwrap(),
148+
batch
149+
.column(1)
150+
.as_any()
151+
.downcast_ref::<Int64Array>()
152+
.unwrap(),
153+
);
154+
for (product_id, amount) in product_ids.iter().zip(amounts) {
155+
if product_id.unwrap() == 24027 {
156+
assert_eq!(amount.unwrap(), 24.0)
157+
} else if product_id.unwrap() == 63700 {
158+
assert_eq!(amount.unwrap(), 23.0)
159+
}
160+
}
161+
once = true
162+
}
163+
}
164+
165+
assert!(once);
166+
}

0 commit comments

Comments
 (0)