diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 0a75d9f52e64..c51a5e02c9c3 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1281,7 +1281,7 @@ fn test_hash_join_after_projection() -> Result<()> { &JoinType::Inner, None, PartitionMode::Auto, - NullEquality::NullEqualsNull, + NullEquality::NullEqualsNothing, )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 7c9fb9de5340..f1154c98d3da 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -236,7 +236,7 @@ pub fn hash_join_exec( join_type, None, PartitionMode::Partitioned, - NullEquality::NullEqualsNull, + NullEquality::NullEqualsNothing, )?)) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c8ed1960393c..2f183474afb6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -725,6 +725,12 @@ impl DisplayAs for HashJoinExec { } else { "".to_string() }; + let display_null_equality = + if matches!(self.null_equality(), NullEquality::NullEqualsNull) { + ", Null Equality: NULL equals NULL" + } else { + "" + }; let on = self .on .iter() @@ -733,8 +739,13 @@ impl DisplayAs for HashJoinExec { .join(", "); write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", - self.mode, self.join_type, on, display_filter, display_projections, + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + self.mode, + self.join_type, + on, + display_filter, + display_projections, + display_null_equality, ) } DisplayFormatType::TreeRender => { @@ -753,6 +764,10 @@ impl DisplayAs for HashJoinExec { writeln!(f, "on={on}")?; + if matches!(self.null_equality(), NullEquality::NullEqualsNull) { + writeln!(f, "Null Equality: NULL equals NULL")?; + } + if let Some(filter) = self.filter.as_ref() { writeln!(f, "filter={filter}")?; } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 3ee8bf5260ae..963542c54035 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -351,15 +351,22 @@ impl DisplayAs for SortMergeJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); + let display_null_equality = + if matches!(self.null_equality(), NullEquality::NullEqualsNull) { + ", Null Equality: NULL equals NULL" + } else { + "" + }; write!( f, - "SortMergeJoin: join_type={:?}, on=[{}]{}", + "SortMergeJoin: join_type={:?}, on=[{}]{}{}", self.join_type, on, self.filter.as_ref().map_or("".to_string(), |f| format!( ", filter={}", f.expression() - )) + )), + display_null_equality, ) } DisplayFormatType::TreeRender => { @@ -375,7 +382,13 @@ impl DisplayAs for SortMergeJoinExec { if self.join_type() != JoinType::Inner { writeln!(f, "join_type={:?}", self.join_type)?; } - writeln!(f, "on={on}") + writeln!(f, "on={on}")?; + + if matches!(self.null_equality(), NullEquality::NullEqualsNull) { + writeln!(f, "Null Equality: NULL equals NULL")?; + } + + Ok(()) } } } diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 4a803c981a92..1e6183f48bac 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -302,4 +302,4 @@ CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-test statement error DataFusion error: SQL error: ParserError\("'IF NOT EXISTS' cannot coexist with 'REPLACE'"\) CREATE OR REPLACE EXTERNAL TABLE IF NOT EXISTS t_conflict(c1 int) STORED AS CSV -LOCATION 'foo.csv'; \ No newline at end of file +LOCATION 'foo.csv'; diff --git a/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt b/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt index fa8b9950adf8..2171bcd972ec 100644 --- a/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt +++ b/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt @@ -81,10 +81,52 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)] +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)], Null Equality: NULL equals NULL 04)------DataSourceExec: partitions=1, partition_sizes=[1] 05)------DataSourceExec: partitions=1, partition_sizes=[1] +statement ok +set datafusion.explain.format = "tree"; + +# Tree explain should highlight null equality semantics +query TT +EXPLAIN SELECT t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val +FROM t1 +JOIN t2 ON t1.val IS NOT DISTINCT FROM t2.val +---- +physical_plan +01)┌───────────────────────────┐ +02)│ ProjectionExec │ +03)│ -------------------- │ +04)│ t1_id: id │ +05)│ t2_id: id │ +06)│ val: val │ +07)└─────────────┬─────────────┘ +08)┌─────────────┴─────────────┐ +09)│ CoalesceBatchesExec │ +10)│ -------------------- │ +11)│ target_batch_size: │ +12)│ 8192 │ +13)└─────────────┬─────────────┘ +14)┌─────────────┴─────────────┐ +15)│ HashJoinExec │ +16)│ -------------------- │ +17)│ Null Equality: NULL equals│ +18)│ NULL ├──────────────┐ +19)│ │ │ +20)│ on: (val = val) │ │ +21)└─────────────┬─────────────┘ │ +22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +23)│ DataSourceExec ││ DataSourceExec │ +24)│ -------------------- ││ -------------------- │ +25)│ bytes: 288 ││ bytes: 288 │ +26)│ format: memory ││ format: memory │ +27)│ rows: 1 ││ rows: 1 │ +28)└───────────────────────────┘└───────────────────────────┘ + +statement ok +set datafusion.explain.format = "indent"; + # For nested expression comparision, it should still able to be converted to Hash Join query IIII rowsort SELECT t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val @@ -108,7 +150,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4] +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4], Null Equality: NULL equals NULL 04)------CoalescePartitionsExec 05)--------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)] 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 @@ -139,7 +181,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], filter=CAST(val@0 AS Int64) % 3 IS DISTINCT FROM CAST(val@1 AS Int64) % 3, projection=[id@0, val@1, id@3, val@4] +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], filter=CAST(val@0 AS Int64) % 3 IS DISTINCT FROM CAST(val@1 AS Int64) % 3, projection=[id@0, val@1, id@3, val@4], Null Equality: NULL equals NULL 04)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)] 05)--------DataSourceExec: partitions=1, partition_sizes=[1] 06)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t2.val + Int64(1)] @@ -201,11 +243,11 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@0, val@1)], projection=[id@1, val@2, id@3, val@4] +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@0, val@1)], projection=[id@1, val@2, id@3, val@4], Null Equality: NULL equals NULL 04)------DataSourceExec: partitions=1, partition_sizes=[1] 05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)--------CoalesceBatchesExec: target_batch_size=8192 -07)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)] +07)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val@1, val@1)], Null Equality: NULL equals NULL 08)------------DataSourceExec: partitions=1, partition_sizes=[1] 09)------------DataSourceExec: partitions=1, partition_sizes=[1] @@ -246,7 +288,7 @@ JOIN t4 ON (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT F 01)ProjectionExec: expr=[id@0 as t3_id, id@3 as t4_id, val1@1 as val1, val1@4 as val1, val2@2 as val2, val2@5 as val2] 02)--CoalesceBatchesExec: target_batch_size=8192 02)--Inner Join: t3.val1 = t4.val1, t3.val2 = t4.val2 -03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val1@1, val1@1), (val2@2, val2@2)] +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(val1@1, val1@1), (val2@2, val2@2)], Null Equality: NULL equals NULL 03)----TableScan: t3 projection=[id, val1, val2] 04)------DataSourceExec: partitions=1, partition_sizes=[1] 04)----TableScan: t4 projection=[id, val1, val2] diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 996ba0d70a63..ef4f78087f45 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -308,7 +308,7 @@ logical_plan physical_plan 01)UnionExec 02)--CoalesceBatchesExec: target_batch_size=2 -03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)] +03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)], Null Equality: NULL equals NULL 04)------CoalescePartitionsExec 05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] 06)----------CoalesceBatchesExec: target_batch_size=2 @@ -321,7 +321,7 @@ physical_plan 13)----------DataSourceExec: partitions=1, partition_sizes=[1] 14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name] 15)----CoalesceBatchesExec: target_batch_size=2 -16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1] +16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1], Null Equality: NULL equals NULL 17)--------CoalescePartitionsExec 18)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] 19)------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] @@ -378,7 +378,7 @@ logical_plan physical_plan 01)UnionExec 02)--CoalesceBatchesExec: target_batch_size=2 -03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)] +03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)], Null Equality: NULL equals NULL 04)------CoalescePartitionsExec 05)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] 06)----------CoalesceBatchesExec: target_batch_size=2 @@ -389,7 +389,7 @@ physical_plan 11)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 12)--------DataSourceExec: partitions=1, partition_sizes=[1] 13)--CoalesceBatchesExec: target_batch_size=2 -14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)] +14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, name@0)], Null Equality: NULL equals NULL 15)------CoalescePartitionsExec 16)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] 17)----------CoalesceBatchesExec: target_batch_size=2