Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ fn test_hash_join_after_projection() -> Result<()> {
&JoinType::Inner,
None,
PartitionMode::Auto,
NullEquality::NullEqualsNull,
NullEquality::NullEqualsNothing,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub fn hash_join_exec(
join_type,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
NullEquality::NullEqualsNothing,
)?))
}

Expand Down
19 changes: 17 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,12 @@ impl DisplayAs for HashJoinExec {
} else {
"".to_string()
};
let display_null_equality =
if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
", Null Equality: NULL equals NULL"
} else {
""
};
let on = self
.on
.iter()
Expand All @@ -733,8 +739,13 @@ impl DisplayAs for HashJoinExec {
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}",
self.mode, self.join_type, on, display_filter, display_projections,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
)
}
DisplayFormatType::TreeRender => {
Expand All @@ -753,6 +764,10 @@ impl DisplayAs for HashJoinExec {

writeln!(f, "on={on}")?;

if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
writeln!(f, "Null Equality: NULL equals NULL")?;
}

if let Some(filter) = self.filter.as_ref() {
writeln!(f, "filter={filter}")?;
}
Expand Down
19 changes: 16 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,22 @@ impl DisplayAs for SortMergeJoinExec {
.map(|(c1, c2)| format!("({c1}, {c2})"))
.collect::<Vec<String>>()
.join(", ");
let display_null_equality =
if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
", Null Equality: NULL equals NULL"
} else {
""
};
write!(
f,
"SortMergeJoin: join_type={:?}, on=[{}]{}",
"SortMergeJoin: join_type={:?}, on=[{}]{}{}",
self.join_type,
on,
self.filter.as_ref().map_or("".to_string(), |f| format!(
", filter={}",
f.expression()
))
)),
display_null_equality,
)
}
DisplayFormatType::TreeRender => {
Expand All @@ -375,7 +382,13 @@ impl DisplayAs for SortMergeJoinExec {
if self.join_type() != JoinType::Inner {
writeln!(f, "join_type={:?}", self.join_type)?;
}
writeln!(f, "on={on}")
writeln!(f, "on={on}")?;

if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
writeln!(f, "Null Equality: NULL equals NULL")?;
}

Ok(())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,4 @@ CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-test
statement error DataFusion error: SQL error: ParserError\("'IF NOT EXISTS' cannot coexist with 'REPLACE'"\)
CREATE OR REPLACE EXTERNAL TABLE IF NOT EXISTS t_conflict(c1 int)
STORED AS CSV
LOCATION 'foo.csv';
LOCATION 'foo.csv';
54 changes: 48 additions & 6 deletions datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,52 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)], Null Equality: NULL equals NULL
04)------DataSourceExec: partitions=1, partition_sizes=[1]
05)------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.explain.format = "tree";

# Tree explain should highlight null equality semantics
query TT
EXPLAIN SELECT t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val
FROM t1
JOIN t2 ON t1.val IS NOT DISTINCT FROM t2.val
----
physical_plan
01)┌───────────────────────────┐
02)│ ProjectionExec │
03)│ -------------------- │
04)│ t1_id: id │
05)│ t2_id: id │
06)│ val: val │
07)└─────────────┬─────────────┘
08)┌─────────────┴─────────────┐
09)│ CoalesceBatchesExec │
10)│ -------------------- │
11)│ target_batch_size: │
12)│ 8192 │
13)└─────────────┬─────────────┘
14)┌─────────────┴─────────────┐
15)│ HashJoinExec │
16)│ -------------------- │
17)│ Null Equality: NULL equals│
18)│ NULL ├──────────────┐
19)│ │ │
20)│ on: (val = val) │ │
21)└─────────────┬─────────────┘ │
22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
23)│ DataSourceExec ││ DataSourceExec │
24)│ -------------------- ││ -------------------- │
25)│ bytes: 288 ││ bytes: 288 │
26)│ format: memory ││ format: memory │
27)│ rows: 1 ││ rows: 1 │
28)└───────────────────────────┘└───────────────────────────┘

statement ok
set datafusion.explain.format = "indent";

# For nested expression comparision, it should still able to be converted to Hash Join
query IIII rowsort
SELECT t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val
Expand All @@ -108,7 +150,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4], Null Equality: NULL equals NULL
04)------CoalescePartitionsExec
05)--------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
Expand Down Expand Up @@ -139,7 +181,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], filter=CAST(val@0 AS Int64) % 3 IS DISTINCT FROM CAST(val@1 AS Int64) % 3, projection=[id@0, val@1, id@3, val@4]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], filter=CAST(val@0 AS Int64) % 3 IS DISTINCT FROM CAST(val@1 AS Int64) % 3, projection=[id@0, val@1, id@3, val@4], Null Equality: NULL equals NULL
04)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)]
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
06)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t2.val + Int64(1)]
Expand Down Expand Up @@ -201,11 +243,11 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@0, val@1)], projection=[id@1, val@2, id@3, val@4]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@0, val@1)], projection=[id@1, val@2, id@3, val@4], Null Equality: NULL equals NULL
04)------DataSourceExec: partitions=1, partition_sizes=[1]
05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)--------CoalesceBatchesExec: target_batch_size=8192
07)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)]
07)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)], Null Equality: NULL equals NULL
08)------------DataSourceExec: partitions=1, partition_sizes=[1]
09)------------DataSourceExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -246,7 +288,7 @@ JOIN t4 ON (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT F
01)ProjectionExec: expr=[id@0 as t3_id, id@3 as t4_id, val1@1 as val1, val1@4 as val1, val2@2 as val2, val2@5 as val2]
02)--CoalesceBatchesExec: target_batch_size=8192
02)--Inner Join: t3.val1 = t4.val1, t3.val2 = t4.val2
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val1@1, val1@1), (val2@2, val2@2)]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val1@1, val1@1), (val2@2, val2@2)], Null Equality: NULL equals NULL
03)----TableScan: t3 projection=[id, val1, val2]
04)------DataSourceExec: partitions=1, partition_sizes=[1]
04)----TableScan: t4 projection=[id, val1, val2]
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ logical_plan
physical_plan
01)UnionExec
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)]
03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)], Null Equality: NULL equals NULL
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=2
Expand All @@ -321,7 +321,7 @@ physical_plan
13)----------DataSourceExec: partitions=1, partition_sizes=[1]
14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
15)----CoalesceBatchesExec: target_batch_size=2
16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1], Null Equality: NULL equals NULL
17)--------CoalescePartitionsExec
18)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
19)------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
Expand Down Expand Up @@ -378,7 +378,7 @@ logical_plan
physical_plan
01)UnionExec
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)]
03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)], Null Equality: NULL equals NULL
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=2
Expand All @@ -389,7 +389,7 @@ physical_plan
11)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)--------DataSourceExec: partitions=1, partition_sizes=[1]
13)--CoalesceBatchesExec: target_batch_size=2
14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)]
14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)], Null Equality: NULL equals NULL
15)------CoalescePartitionsExec
16)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
17)----------CoalesceBatchesExec: target_batch_size=2
Expand Down