Skip to content

Commit 028acf7

Browse files
author
Jan Kaul
committed
add truncate partition test
1 parent 558a81a commit 028acf7

File tree

1 file changed

+191
-0
lines changed

1 file changed

+191
-0
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,197 @@ mod tests {
14791479
};
14801480
}
14811481

1482+
#[tokio::test]
1483+
pub async fn test_datafusion_table_insert_truncate_partitioned() {
1484+
let object_store = ObjectStoreBuilder::memory();
1485+
1486+
let catalog: Arc<dyn Catalog> = Arc::new(
1487+
SqlCatalog::new("sqlite://", "test", object_store)
1488+
.await
1489+
.unwrap(),
1490+
);
1491+
1492+
let schema = Schema::builder()
1493+
.with_struct_field(StructField {
1494+
id: 1,
1495+
name: "id".to_string(),
1496+
required: true,
1497+
field_type: Type::Primitive(PrimitiveType::Long),
1498+
doc: None,
1499+
})
1500+
.with_struct_field(StructField {
1501+
id: 2,
1502+
name: "customer_id".to_string(),
1503+
required: true,
1504+
field_type: Type::Primitive(PrimitiveType::Long),
1505+
doc: None,
1506+
})
1507+
.with_struct_field(StructField {
1508+
id: 3,
1509+
name: "product_id".to_string(),
1510+
required: true,
1511+
field_type: Type::Primitive(PrimitiveType::Long),
1512+
doc: None,
1513+
})
1514+
.with_struct_field(StructField {
1515+
id: 4,
1516+
name: "date".to_string(),
1517+
required: true,
1518+
field_type: Type::Primitive(PrimitiveType::Date),
1519+
doc: None,
1520+
})
1521+
.with_struct_field(StructField {
1522+
id: 5,
1523+
name: "amount".to_string(),
1524+
required: true,
1525+
field_type: Type::Primitive(PrimitiveType::Int),
1526+
doc: None,
1527+
})
1528+
.build()
1529+
.unwrap();
1530+
1531+
let partition_spec = PartitionSpec::builder()
1532+
.with_partition_field(PartitionField::new(
1533+
2,
1534+
1000,
1535+
"customer_id_truncate",
1536+
Transform::Truncate(2),
1537+
))
1538+
.build()
1539+
.expect("Failed to create partition spec");
1540+
1541+
let table = Table::builder()
1542+
.with_name("orders")
1543+
.with_location("/test/orders")
1544+
.with_schema(schema)
1545+
.with_partition_spec(partition_spec)
1546+
.build(&["test".to_owned()], catalog)
1547+
.await
1548+
.expect("Failed to create table");
1549+
1550+
let table = Arc::new(DataFusionTable::from(table));
1551+
1552+
let ctx = SessionContext::new();
1553+
1554+
ctx.register_table("orders", table.clone()).unwrap();
1555+
1556+
ctx.sql(
1557+
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
1558+
(1, 123, 1, '2020-01-01', 1),
1559+
(2, 234, 1, '2020-01-01', 1),
1560+
(3, 345, 1, '2020-01-01', 3),
1561+
(4, 123, 2, '2020-02-02', 1),
1562+
(5, 123, 1, '2020-02-02', 2),
1563+
(6, 345, 3, '2020-02-02', 3);",
1564+
)
1565+
.await
1566+
.expect("Failed to create query plan for insert")
1567+
.collect()
1568+
.await
1569+
.expect("Failed to insert values into table");
1570+
1571+
let batches = ctx
1572+
.sql("select product_id, sum(amount) from orders where customer_id = 123 group by product_id;")
1573+
.await
1574+
.expect("Failed to create plan for select")
1575+
.collect()
1576+
.await
1577+
.expect("Failed to execute select query");
1578+
1579+
for batch in batches {
1580+
if batch.num_rows() != 0 {
1581+
let (product_ids, amounts) = (
1582+
batch
1583+
.column(0)
1584+
.as_any()
1585+
.downcast_ref::<Int64Array>()
1586+
.unwrap(),
1587+
batch
1588+
.column(1)
1589+
.as_any()
1590+
.downcast_ref::<Int64Array>()
1591+
.unwrap(),
1592+
);
1593+
for (product_id, amount) in product_ids.iter().zip(amounts) {
1594+
if product_id.unwrap() == 1 {
1595+
assert_eq!(amount.unwrap(), 3)
1596+
} else if product_id.unwrap() == 2 {
1597+
assert_eq!(amount.unwrap(), 1)
1598+
} else if product_id.unwrap() == 3 {
1599+
assert_eq!(amount.unwrap(), 0)
1600+
} else {
1601+
panic!("Unexpected order id")
1602+
}
1603+
}
1604+
}
1605+
}
1606+
1607+
ctx.sql(
1608+
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
1609+
(7, 123, 3, '2020-01-03', 1),
1610+
(8, 234, 1, '2020-01-03', 2),
1611+
(9, 234, 2, '2020-01-03', 1),
1612+
(10, 123, 2, '2020-01-04', 3),
1613+
(11, 345, 1, '2020-01-04', 2),
1614+
(12, 234, 3, '2020-01-04', 1),
1615+
(13, 123, 1, '2020-01-05', 4),
1616+
(14, 345, 2, '2020-01-05', 2),
1617+
(15, 234, 3, '2020-01-05', 3),
1618+
(16, 234, 3, '2020-01-05', 3),
1619+
(17, 123, 3, '2020-01-06', 1),
1620+
(18, 234, 1, '2020-01-06', 2),
1621+
(19, 234, 2, '2020-01-06', 1),
1622+
(20, 123, 2, '2020-01-07', 3),
1623+
(21, 345, 1, '2020-01-07', 2),
1624+
(22, 234, 3, '2020-01-07', 1),
1625+
(23, 123, 1, '2020-01-08', 4),
1626+
(24, 345, 2, '2020-01-08', 2),
1627+
(25, 234, 3, '2020-01-08', 3);",
1628+
)
1629+
.await
1630+
.expect("Failed to create query plan for insert")
1631+
.collect()
1632+
.await
1633+
.expect("Failed to insert values into table");
1634+
1635+
let batches = ctx
1636+
.sql("select product_id, sum(amount) from orders where customer_id = 123 group by product_id;")
1637+
.await
1638+
.expect("Failed to create plan for select")
1639+
.collect()
1640+
.await
1641+
.expect("Failed to execute select query");
1642+
1643+
for batch in batches {
1644+
if batch.num_rows() != 0 {
1645+
let (product_ids, amounts) = (
1646+
batch
1647+
.column(0)
1648+
.as_any()
1649+
.downcast_ref::<Int64Array>()
1650+
.unwrap(),
1651+
batch
1652+
.column(1)
1653+
.as_any()
1654+
.downcast_ref::<Int64Array>()
1655+
.unwrap(),
1656+
);
1657+
for (product_id, amount) in product_ids.iter().zip(amounts) {
1658+
match product_id.unwrap() {
1659+
1 => assert_eq!(amount.unwrap(), 11),
1660+
2 => assert_eq!(amount.unwrap(), 7),
1661+
3 => assert_eq!(amount.unwrap(), 2),
1662+
_ => panic!("Unexpected order id"),
1663+
}
1664+
}
1665+
}
1666+
}
1667+
1668+
if let Tabular::Table(table) = table.tabular.read().await.deref() {
1669+
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
1670+
};
1671+
}
1672+
14821673
#[tokio::test]
14831674
pub async fn test_datafusion_table_branch_insert() {
14841675
let object_store = ObjectStoreBuilder::memory();

0 commit comments

Comments
 (0)