Skip to content

Commit 7f90fc4

Browse files
authored
chore: Finish refactoring expression serde out of QueryPlanSerde (#2791)
1 parent eef5f28 commit 7f90fc4

File tree

433 files changed

+36812
-40420
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

433 files changed

+36812
-40420
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ These settings can be used to determine which parts of the plan are accelerated
214214
| `spark.comet.expression.BitwiseNot.enabled` | Enable Comet acceleration for `BitwiseNot` | true |
215215
| `spark.comet.expression.BitwiseOr.enabled` | Enable Comet acceleration for `BitwiseOr` | true |
216216
| `spark.comet.expression.BitwiseXor.enabled` | Enable Comet acceleration for `BitwiseXor` | true |
217+
| `spark.comet.expression.BloomFilterMightContain.enabled` | Enable Comet acceleration for `BloomFilterMightContain` | true |
217218
| `spark.comet.expression.CaseWhen.enabled` | Enable Comet acceleration for `CaseWhen` | true |
218219
| `spark.comet.expression.Cast.enabled` | Enable Comet acceleration for `Cast` | true |
219220
| `spark.comet.expression.Ceil.enabled` | Enable Comet acceleration for `Ceil` | true |
@@ -258,6 +259,7 @@ These settings can be used to determine which parts of the plan are accelerated
258259
| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true |
259260
| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true |
260261
| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true |
262+
| `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true |
261263
| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true |
262264
| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true |
263265
| `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true |
@@ -267,6 +269,7 @@ These settings can be used to determine which parts of the plan are accelerated
267269
| `spark.comet.expression.Log10.enabled` | Enable Comet acceleration for `Log10` | true |
268270
| `spark.comet.expression.Log2.enabled` | Enable Comet acceleration for `Log2` | true |
269271
| `spark.comet.expression.Lower.enabled` | Enable Comet acceleration for `Lower` | true |
272+
| `spark.comet.expression.MakeDecimal.enabled` | Enable Comet acceleration for `MakeDecimal` | true |
270273
| `spark.comet.expression.MapEntries.enabled` | Enable Comet acceleration for `MapEntries` | true |
271274
| `spark.comet.expression.MapFromArrays.enabled` | Enable Comet acceleration for `MapFromArrays` | true |
272275
| `spark.comet.expression.MapKeys.enabled` | Enable Comet acceleration for `MapKeys` | true |
@@ -289,6 +292,7 @@ These settings can be used to determine which parts of the plan are accelerated
289292
| `spark.comet.expression.Remainder.enabled` | Enable Comet acceleration for `Remainder` | true |
290293
| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true |
291294
| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true |
295+
| `spark.comet.expression.ScalarSubquery.enabled` | Enable Comet acceleration for `ScalarSubquery` | true |
292296
| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true |
293297
| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true |
294298
| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true |
@@ -320,6 +324,7 @@ These settings can be used to determine which parts of the plan are accelerated
320324
| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true |
321325
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
322326
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
327+
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
323328
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
324329
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |
325330
| `spark.comet.expression.WeekOfYear.enabled` | Enable Comet acceleration for `WeekOfYear` | true |
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, BloomFilterMightContain}
23+
24+
import org.apache.comet.CometSparkSessionExtensions.withInfo
25+
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
26+
27+
object CometBloomFilterMightContain extends CometExpressionSerde[BloomFilterMightContain] {
28+
29+
override def convert(
30+
expr: BloomFilterMightContain,
31+
inputs: Seq[Attribute],
32+
binding: Boolean): Option[ExprOuterClass.Expr] = {
33+
34+
val bloomFilter = expr.left
35+
val value = expr.right
36+
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding)
37+
val valueExpr = exprToProtoInternal(value, inputs, binding)
38+
if (bloomFilterExpr.isDefined && valueExpr.isDefined) {
39+
val builder = ExprOuterClass.BloomFilterMightContain.newBuilder()
40+
builder.setBloomFilter(bloomFilterExpr.get)
41+
builder.setValue(valueExpr.get)
42+
Some(
43+
ExprOuterClass.Expr
44+
.newBuilder()
45+
.setBloomFilterMightContain(builder)
46+
.build())
47+
} else {
48+
withInfo(expr, bloomFilter, value)
49+
None
50+
}
51+
}
52+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.execution.ScalarSubquery
24+
25+
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType}
27+
28+
object CometScalarSubquery extends CometExpressionSerde[ScalarSubquery] {
29+
override def convert(
30+
expr: ScalarSubquery,
31+
inputs: Seq[Attribute],
32+
binding: Boolean): Option[ExprOuterClass.Expr] = {
33+
if (supportedDataType(expr.dataType)) {
34+
val dataType = serializeDataType(expr.dataType)
35+
if (dataType.isEmpty) {
36+
withInfo(expr, s"Failed to serialize datatype ${expr.dataType} for scalar subquery")
37+
return None
38+
}
39+
40+
val builder = ExprOuterClass.Subquery
41+
.newBuilder()
42+
.setId(expr.exprId.id)
43+
.setDatatype(dataType.get)
44+
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
45+
} else {
46+
withInfo(expr, s"Unsupported data type: ${expr.dataType}")
47+
None
48+
}
49+
50+
}
51+
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 9 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
2727
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
28-
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
29-
import org.apache.spark.sql.comet._
30-
import org.apache.spark.sql.execution
31-
import org.apache.spark.sql.execution._
28+
import org.apache.spark.sql.comet.DecimalPrecision
29+
import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan}
3230
import org.apache.spark.sql.internal.SQLConf
3331
import org.apache.spark.sql.types._
3432

@@ -153,7 +151,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
153151
classOf[Lower] -> CometLower,
154152
classOf[OctetLength] -> CometScalarFunction("octet_length"),
155153
classOf[RegExpReplace] -> CometRegExpReplace,
156-
classOf[Reverse] -> CometScalarFunction("reverse"),
154+
classOf[Reverse] -> CometReverse,
157155
classOf[RLike] -> CometRLike,
158156
classOf[StartsWith] -> CometScalarFunction("starts_with"),
159157
classOf[StringInstr] -> CometScalarFunction("instr"),
@@ -203,21 +201,20 @@ object QueryPlanSerde extends Logging with CometExprShim {
203201

204202
private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
205203
// TODO PromotePrecision
206-
// TODO KnownFloatingPointNormalized
207-
// TODO ScalarSubquery
208-
// TODO UnscaledValue
209-
// TODO MakeDecimal
210-
// TODO BloomFilterMightContain
211-
// TODO RegExpReplace
212204
classOf[Alias] -> CometAlias,
213205
classOf[AttributeReference] -> CometAttributeReference,
206+
classOf[BloomFilterMightContain] -> CometBloomFilterMightContain,
214207
classOf[CheckOverflow] -> CometCheckOverflow,
215208
classOf[Coalesce] -> CometCoalesce,
209+
classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized,
216210
classOf[Literal] -> CometLiteral,
211+
classOf[MakeDecimal] -> CometMakeDecimal,
217212
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
213+
classOf[ScalarSubquery] -> CometScalarSubquery,
218214
classOf[SparkPartitionID] -> CometSparkPartitionId,
219215
classOf[SortOrder] -> CometSortOrder,
220-
classOf[StaticInvoke] -> CometStaticInvoke)
216+
classOf[StaticInvoke] -> CometStaticInvoke,
217+
classOf[UnscaledValue] -> CometUnscaledValue)
221218

222219
/**
223220
* Mapping of Spark expression class to Comet expression handler.
@@ -541,74 +538,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
541538
// `PromotePrecision` is just a wrapper, don't need to serialize it.
542539
exprToProtoInternal(child, inputs, binding)
543540

544-
case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) =>
545-
val dataType = serializeDataType(expr.dataType)
546-
if (dataType.isEmpty) {
547-
withInfo(expr, s"Unsupported datatype ${expr.dataType}")
548-
return None
549-
}
550-
val ex = exprToProtoInternal(expr, inputs, binding)
551-
ex.map { child =>
552-
val builder = ExprOuterClass.NormalizeNaNAndZero
553-
.newBuilder()
554-
.setChild(child)
555-
.setDatatype(dataType.get)
556-
ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build()
557-
}
558-
559-
case s @ execution.ScalarSubquery(_, _) =>
560-
if (supportedDataType(s.dataType)) {
561-
val dataType = serializeDataType(s.dataType)
562-
if (dataType.isEmpty) {
563-
withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}")
564-
return None
565-
}
566-
567-
val builder = ExprOuterClass.Subquery
568-
.newBuilder()
569-
.setId(s.exprId.id)
570-
.setDatatype(dataType.get)
571-
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
572-
} else {
573-
withInfo(s, s"Unsupported data type: ${s.dataType}")
574-
None
575-
}
576-
577-
case UnscaledValue(child) =>
578-
val childExpr = exprToProtoInternal(child, inputs, binding)
579-
val optExpr =
580-
scalarFunctionExprToProtoWithReturnType("unscaled_value", LongType, false, childExpr)
581-
optExprWithInfo(optExpr, expr, child)
582-
583-
case MakeDecimal(child, precision, scale, true) =>
584-
val childExpr = exprToProtoInternal(child, inputs, binding)
585-
val optExpr = scalarFunctionExprToProtoWithReturnType(
586-
"make_decimal",
587-
DecimalType(precision, scale),
588-
false,
589-
childExpr)
590-
optExprWithInfo(optExpr, expr, child)
591-
592-
case b @ BloomFilterMightContain(_, _) =>
593-
val bloomFilter = b.left
594-
val value = b.right
595-
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding)
596-
val valueExpr = exprToProtoInternal(value, inputs, binding)
597-
if (bloomFilterExpr.isDefined && valueExpr.isDefined) {
598-
val builder = ExprOuterClass.BloomFilterMightContain.newBuilder()
599-
builder.setBloomFilter(bloomFilterExpr.get)
600-
builder.setValue(valueExpr.get)
601-
Some(
602-
ExprOuterClass.Expr
603-
.newBuilder()
604-
.setBloomFilterMightContain(builder)
605-
.build())
606-
} else {
607-
withInfo(expr, bloomFilter, value)
608-
None
609-
}
610-
case r @ Reverse(child) if child.dataType.isInstanceOf[ArrayType] =>
611-
convert(r, CometArrayReverse)
612541
case expr =>
613542
QueryPlanSerde.exprSerdeMap.get(expr.getClass) match {
614543
case Some(handler) =>
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Reverse}
23+
import org.apache.spark.sql.types.ArrayType
24+
25+
import org.apache.comet.serde.ExprOuterClass.Expr
26+
27+
object CometReverse extends CometScalarFunction[Reverse]("reverse") {
28+
29+
override def getSupportLevel(expr: Reverse): SupportLevel = {
30+
if (expr.child.dataType.isInstanceOf[ArrayType]) {
31+
CometArrayReverse.getSupportLevel(expr)
32+
} else {
33+
Compatible()
34+
}
35+
}
36+
37+
override def convert(expr: Reverse, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
38+
if (expr.child.dataType.isInstanceOf[ArrayType]) {
39+
CometArrayReverse.convert(expr, inputs, binding)
40+
} else {
41+
super.convert(expr, inputs, binding)
42+
}
43+
}
44+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, KnownFloatingPointNormalized}
23+
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
24+
25+
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, serializeDataType}
27+
28+
object CometKnownFloatingPointNormalized
29+
extends CometExpressionSerde[KnownFloatingPointNormalized] {
30+
31+
override def getSupportLevel(expr: KnownFloatingPointNormalized): SupportLevel = {
32+
expr.child match {
33+
case _: NormalizeNaNAndZero => Compatible()
34+
case _ =>
35+
Unsupported(
36+
Some(
37+
"KnownFloatingPointNormalized only supports NormalizeNaNAndZero child expressions"))
38+
}
39+
}
40+
41+
override def convert(
42+
expr: KnownFloatingPointNormalized,
43+
inputs: Seq[Attribute],
44+
binding: Boolean): Option[ExprOuterClass.Expr] = {
45+
46+
val wrapped = expr.child.asInstanceOf[NormalizeNaNAndZero].child
47+
48+
val dataType = serializeDataType(wrapped.dataType)
49+
if (dataType.isEmpty) {
50+
withInfo(wrapped, s"Unsupported datatype ${wrapped.dataType}")
51+
return None
52+
}
53+
val ex = exprToProtoInternal(wrapped, inputs, binding)
54+
val optExpr = ex.map { child =>
55+
val builder = ExprOuterClass.NormalizeNaNAndZero
56+
.newBuilder()
57+
.setChild(child)
58+
.setDatatype(dataType.get)
59+
ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build()
60+
}
61+
optExprWithInfo(optExpr, expr, wrapped)
62+
}
63+
}

0 commit comments

Comments
 (0)