From 610afe75c8eb84ef2c7caa8fdf9da47c0d8aa33d Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 28 Oct 2025 15:08:33 +0400 Subject: [PATCH 1/9] WIP --- .../scala/org/apache/comet/CometConf.scala | 2 + docs/source/user-guide/latest/configs.md | 1 + .../apache/comet/rules/CometExecRule.scala | 71 ++++++---------- .../apache/comet/serde/QueryPlanSerde.scala | 6 +- .../serde/operator/CometLocalTableScan.scala | 48 +++++++++++ .../serde/{ => operator}/CometProject.scala | 3 +- .../serde/{ => operator}/CometSort.scala | 3 +- .../sql/comet/CometLocalTableScanExec.scala | 82 +++++++++++++++++++ 8 files changed, 169 insertions(+), 47 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala rename spark/src/main/scala/org/apache/comet/serde/{ => operator}/CometProject.scala (94%) rename spark/src/main/scala/org/apache/comet/serde/{ => operator}/CometSort.scala (94%) create mode 100644 spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 7b55932c71..7564aef2c6 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -254,6 +254,8 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) + val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] = + createExecEnabledConfig("localTableScan", defaultValue = true) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index bc8fdcb982..5e904940ba 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -148,6 +148,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | | `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true | +| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | true | | `spark.comet.exec.project.enabled` | Whether to enable project by default. | true | | `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true | | `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true | diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 7a2f8a04f7..6358e5ccfe 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -78,63 +78,41 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { /** * Tries to transform a Spark physical plan into a Comet plan. * - * This rule traverses bottom-up from the original Spark plan and for each plan node, there - * are a few cases to consider: + * This rule traverses bottom-up from the original Spark plan and for each plan node, there are + * a few cases to consider: * - * 1. The child(ren) of the current node `p` cannot be converted to native - * In this case, we'll simply return the original Spark plan, since Comet native - * execution cannot start from an arbitrary Spark operator (unless it is special node - * such as scan or sink such as shuffle exchange, union etc., which are wrapped by - * `CometScanWrapper` and `CometSinkPlaceHolder` respectively). + * 1. The child(ren) of the current node `p` cannot be converted to native In this case, we'll + * simply return the original Spark plan, since Comet native execution cannot start from an + * arbitrary Spark operator (unless it is special node such as scan or sink such as shuffle + * exchange, union etc., which are wrapped by `CometScanWrapper` and `CometSinkPlaceHolder` + * respectively). * - * 2. The child(ren) of the current node `p` can be converted to native - * There are two sub-cases for this scenario: 1) This node `p` can also be converted to - * native. In this case, we'll create a new native Comet operator for `p` and connect it with - * its previously converted child(ren); 2) This node `p` cannot be converted to native. In - * this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still - * be native Comet operators. + * 2. The child(ren) of the current node `p` can be converted to native There are two sub-cases + * for this scenario: 1) This node `p` can also be converted to native. In this case, we'll + * create a new native Comet operator for `p` and connect it with its previously converted + * child(ren); 2) This node `p` cannot be converted to native. In this case, similar to 1) + * above, we simply return `p` as it is. Its child(ren) would still be native Comet operators. * * After this rule finishes, we'll do another pass on the final plan to convert all adjacent - * Comet native operators into a single native execution block. Please see where - * `convertBlock` is called below. + * Comet native operators into a single native execution block. Please see where `convertBlock` + * is called below. * * Here are a few examples: * - * Scan ======> CometScan - * | | - * Filter CometFilter - * | | - * HashAggregate CometHashAggregate - * | | - * Exchange CometExchange - * | | - * HashAggregate CometHashAggregate - * | | - * UnsupportedOperator UnsupportedOperator + * Scan ======> CometScan \| | Filter CometFilter \| | HashAggregate CometHashAggregate \| | + * Exchange CometExchange \| | HashAggregate CometHashAggregate \| | UnsupportedOperator + * UnsupportedOperator * * Native execution doesn't necessarily have to start from `CometScan`: * - * Scan =======> CometScan - * | | - * UnsupportedOperator UnsupportedOperator - * | | - * HashAggregate HashAggregate - * | | - * Exchange CometExchange - * | | - * HashAggregate CometHashAggregate - * | | - * UnsupportedOperator UnsupportedOperator + * Scan =======> CometScan \| | UnsupportedOperator UnsupportedOperator \| | HashAggregate + * HashAggregate \| | Exchange CometExchange \| | HashAggregate CometHashAggregate \| | + * UnsupportedOperator UnsupportedOperator * * A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`: * - * Scan Scan =======> CometScan CometScan - * | | | | - * Filter Filter CometFilter CometFilter - * | | | | - * Union CometUnion - * | | - * Project CometProject + * Scan Scan =======> CometScan CometScan \| | | | Filter Filter CometFilter CometFilter \| | | + * \| Union CometUnion \| | Project CometProject */ // spotless:on private def transform(plan: SparkPlan): SparkPlan = { @@ -534,6 +512,11 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s } + case op: LocalTableScanExec => + newPlanWithProto( + op, + CometLocalTableScanExec(_, op, op.rows, op.output, SerializedPlan(None))) + case op => op match { case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9f418e3068..c80f7a4e61 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -55,6 +55,7 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ import org.apache.comet.serde.literals.CometLiteral +import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort} import org.apache.comet.shims.CometExprShim /** @@ -66,7 +67,10 @@ object QueryPlanSerde extends Logging with CometExprShim { * Mapping of Spark operator class to Comet operator handler. */ private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = - Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort) + Map( + classOf[ProjectExec] -> CometProject, + classOf[SortExec] -> CometSort, + classOf[LocalTableScanExec] -> CometLocalTableScan) private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[ArrayAppend] -> CometArrayAppend, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala new file mode 100644 index 0000000000..ef270af4a4 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde.operator + +import scala.jdk.CollectionConverters.asJavaIterableConverter + +import org.apache.spark.sql.execution.LocalTableScanExec + +import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.QueryPlanSerde.serializeDataType + +object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) + + override def convert( + op: LocalTableScanExec, + builder: Operator.Builder, + childOp: Operator*): Option[Operator] = { + val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType)) + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource(op.getClass.getSimpleName) + .addAllFields(scanTypes.asJava) + .setArrowFfiSafe(false) + Some(builder.setScan(scanBuilder).build()) + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometProject.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala index 651aa8fefd..4ba02945d6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometProject.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.ProjectExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometSort.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala index 2dec25c0dd..9928c39c40 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.SortExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala new file mode 100644 index 0000000000..aad68c4c2a --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters +import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.google.common.base.Objects + +import org.apache.comet.CometConf +import org.apache.comet.serde.OperatorOuterClass.Operator + +case class CometLocalTableScanExec( + override val nativeOp: Operator, + originalPlan: LocalTableScanExec, + @transient rows: Seq[InternalRow], + override val output: Seq[Attribute], + override val serializedPlanOpt: SerializedPlan) + extends CometLeafExec { + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + @transient private lazy val rdd: RDD[InternalRow] = { + if (rows.isEmpty) { + sparkContext.emptyRDD + } else { + val numSlices = math.min(rows.length, session.leafNodeDefaultParallelism) + sparkContext.parallelize(rows, numSlices) + } + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) + val timeZoneId = conf.sessionLocalTimeZone + rdd.mapPartitionsInternal { sparkBatches => + val context = TaskContext.get() + CometArrowConverters.rowToArrowBatchIter( + sparkBatches, + schema, + maxRecordsPerBatch, + timeZoneId, + context) + } + } + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometLocalTableScanExec => + this.originalPlan == other.originalPlan && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt) + +} From 1967a256bebb266d7a5cd0f6fb3571a6e19f1cd8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Fri, 31 Oct 2025 20:04:44 +0400 Subject: [PATCH 2/9] WIP --- .../serde/operator/CometLocalTableScan.scala | 2 +- .../sql/comet/CometLocalTableScanExec.scala | 18 +++++++++++++----- .../org/apache/spark/sql/comet/operators.scala | 1 - 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala index ef270af4a4..092f489e09 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala @@ -42,7 +42,7 @@ object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] { .newBuilder() .setSource(op.getClass.getSimpleName) .addAllFields(scanTypes.asJava) - .setArrowFfiSafe(false) + .setArrowFfiSafe(true) Some(builder.setScan(scanBuilder).build()) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index aad68c4c2a..62acdc024b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -22,7 +22,7 @@ package org.apache.spark.sql.comet import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -44,12 +44,21 @@ case class CometLocalTableScanExec( override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + @transient private lazy val unsafeRows: Array[InternalRow] = { + if (rows.isEmpty) { + Array.empty + } else { + val proj = UnsafeProjection.create(output, output) + rows.map(r => proj(r).copy()).toArray + } + } + @transient private lazy val rdd: RDD[InternalRow] = { if (rows.isEmpty) { sparkContext.emptyRDD } else { - val numSlices = math.min(rows.length, session.leafNodeDefaultParallelism) - sparkContext.parallelize(rows, numSlices) + val numSlices = math.min(unsafeRows.length, session.leafNodeDefaultParallelism) + sparkContext.parallelize(unsafeRows, numSlices) } } @@ -60,7 +69,7 @@ case class CometLocalTableScanExec( val context = TaskContext.get() CometArrowConverters.rowToArrowBatchIter( sparkBatches, - schema, + originalPlan.schema, maxRecordsPerBatch, timeZoneId, context) @@ -78,5 +87,4 @@ case class CometLocalTableScanExec( } override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt) - } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index de6892638a..6d02a05f73 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -298,7 +298,6 @@ abstract class CometNativeExec extends CometExec { if (containsBroadcastInput && firstNonBroadcastPlan.isEmpty) { throw new CometRuntimeException(s"Cannot find the first non broadcast plan: $this") } - // If the first non broadcast plan is found, we need to adjust the partition number of // the broadcast plans to make sure they have the same partition number as the first non // broadcast plan. From 0c6a721675c7977814c7d0a4323232554835bcc9 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 2 Nov 2025 18:07:48 +0400 Subject: [PATCH 3/9] WIP --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- .../scala/org/apache/comet/serde/operator/CometSort.scala | 2 +- .../org/apache/spark/sql/comet/CometLocalTableScanExec.scala | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e5da357c7f..d6579b69d3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -55,7 +55,7 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ import org.apache.comet.serde.literals.CometLiteral -import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort} +import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder} import org.apache.comet.shims.CometExprShim /** diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala index a0f933ba3f..39a1c55656 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSort.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, Stru import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.{CometExpressionSerde, CometOperatorSerde, Compatible, ExprOuterClass, Incompatible, OperatorOuterClass, SupportLevel} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, supportedSortType} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 62acdc024b..9feebb6e07 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters -import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.execution.{InputRDDCodegen, LocalTableScanExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -76,6 +76,8 @@ case class CometLocalTableScanExec( } } + override def supportsColumnar: Boolean = true + override def equals(obj: Any): Boolean = { obj match { case other: CometLocalTableScanExec => From 1edc37ec6177fa0d65ee388574a28597fd03ac34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 8 Nov 2025 12:47:33 +0400 Subject: [PATCH 4/9] Add native CometLocalTableScanExec operator --- .../apache/comet/rules/CometExecRule.scala | 12 ++--- .../apache/comet/serde/QueryPlanSerde.scala | 3 ++ .../serde/operator/CometLocalTableScan.scala | 2 +- .../sql/comet/CometLocalTableScanExec.scala | 45 ++++++++++++++----- .../apache/spark/sql/comet/operators.scala | 2 +- 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 75ed6088f6..e6c168847d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -227,7 +227,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { if (multiMode || sparkFinalMode) { op } else { - newPlanWithProto( + val plan1 = newPlanWithProto( op, nativeOp => { // The aggExprs could be empty. For example, if the aggregate functions only have @@ -247,6 +247,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { op.child, SerializedPlan(None)) }) + println(plan1) + plan1 } case op: ShuffledHashJoinExec @@ -512,10 +514,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s } - case op: LocalTableScanExec => - newPlanWithProto( - op, - CometLocalTableScanExec(_, op, op.rows, op.output, SerializedPlan(None))) + case op: LocalTableScanExec if CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf) => + val nativeOp = QueryPlanSerde.operator2Proto(op) + val cometOp = CometLocalTableScanExec(op, op.rows, op.output) + CometScanWrapper(nativeOp.get, cometOp) case op => op match { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d6579b69d3..da39fb44d3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1174,6 +1174,7 @@ object QueryPlanSerde extends Logging with CometExprShim { if (aggregate.isInstanceOf[HashAggregateExec] || aggregate.isInstanceOf[ObjectHashAggregateExec]) && CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) => + println("COMET_EXEC_AGGREGATE_ENABLED0") val groupingExpressions = aggregate.groupingExpressions val aggregateExpressions = aggregate.aggregateExpressions val aggregateAttributes = aggregate.aggregateAttributes @@ -1181,12 +1182,14 @@ object QueryPlanSerde extends Logging with CometExprShim { val child = aggregate.child if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) { + println("COMET_EXEC_AGGREGATE_ENABLED1") withInfo(op, "No group by or aggregation") return None } // Aggregate expressions with filter are not supported yet. if (aggregateExpressions.exists(_.filter.isDefined)) { + println("COMET_EXEC_AGGREGATE_ENABLED2") withInfo(op, "Aggregate expression with filter is not supported") return None } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala index 092f489e09..ef270af4a4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala @@ -42,7 +42,7 @@ object CometLocalTableScan extends CometOperatorSerde[LocalTableScanExec] { .newBuilder() .setSource(op.getClass.getSimpleName) .addAllFields(scanTypes.asJava) - .setArrowFfiSafe(true) + .setArrowFfiSafe(false) Some(builder.setScan(scanBuilder).build()) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 9feebb6e07..4b0def6e7e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -23,23 +23,22 @@ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters -import org.apache.spark.sql.execution.{InputRDDCodegen, LocalTableScanExec} +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.base.Objects import org.apache.comet.CometConf -import org.apache.comet.serde.OperatorOuterClass.Operator case class CometLocalTableScanExec( - override val nativeOp: Operator, originalPlan: LocalTableScanExec, @transient rows: Seq[InternalRow], - override val output: Seq[Attribute], - override val serializedPlanOpt: SerializedPlan) - extends CometLeafExec { + override val output: Seq[Attribute]) + extends CometExec + with LeafExecNode { override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -63,16 +62,26 @@ case class CometLocalTableScanExec( } override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numOutputRows") val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) val timeZoneId = conf.sessionLocalTimeZone rdd.mapPartitionsInternal { sparkBatches => val context = TaskContext.get() - CometArrowConverters.rowToArrowBatchIter( + val batches = CometArrowConverters.rowToArrowBatchIter( sparkBatches, originalPlan.schema, maxRecordsPerBatch, timeZoneId, context) + createMetricsIterator(batches, numInputRows) + } + } + + override protected def stringArgs: Iterator[Any] = { + if (rows.isEmpty) { + Iterator("", output) + } else { + Iterator(output) } } @@ -81,12 +90,28 @@ case class CometLocalTableScanExec( override def equals(obj: Any): Boolean = { obj match { case other: CometLocalTableScanExec => - this.originalPlan == other.originalPlan && - this.serializedPlanOpt == other.serializedPlanOpt + this.originalPlan == other.originalPlan && this.schema == other.schema && this.output == other.output case _ => false } } - override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt) + override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output) +} + +object CometLocalTableScanExec { + + private def createMetricsIterator( + it: Iterator[ColumnarBatch], + numInputRows: SQLMetric): Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { + override def hasNext: Boolean = it.hasNext + + override def next(): ColumnarBatch = { + val batch = it.next() + numInputRows.add(batch.numRows()) + batch + } + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6d02a05f73..a5c4672fc7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -379,7 +379,7 @@ abstract class CometNativeExec extends CometExec { _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | - _: CometSparkToColumnarExec => + _: CometSparkToColumnarExec | _: CometLocalTableScanExec => func(plan) case _: CometPlan => // Other Comet operators, continue to traverse the tree. From 7797ac9c1222778a1d8665bb95ceff8c5298b0a7 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 8 Nov 2025 19:52:35 +0400 Subject: [PATCH 5/9] Some refactoring --- .../apache/comet/rules/CometExecRule.scala | 73 ++++++++++++------- .../apache/comet/serde/QueryPlanSerde.scala | 3 - .../sql/comet/CometLocalTableScanExec.scala | 4 +- 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index e6c168847d..52ea7faf06 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -78,41 +78,63 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { /** * Tries to transform a Spark physical plan into a Comet plan. * - * This rule traverses bottom-up from the original Spark plan and for each plan node, there are - * a few cases to consider: + * This rule traverses bottom-up from the original Spark plan and for each plan node, there + * are a few cases to consider: * - * 1. The child(ren) of the current node `p` cannot be converted to native In this case, we'll - * simply return the original Spark plan, since Comet native execution cannot start from an - * arbitrary Spark operator (unless it is special node such as scan or sink such as shuffle - * exchange, union etc., which are wrapped by `CometScanWrapper` and `CometSinkPlaceHolder` - * respectively). + * 1. The child(ren) of the current node `p` cannot be converted to native + * In this case, we'll simply return the original Spark plan, since Comet native + * execution cannot start from an arbitrary Spark operator (unless it is special node + * such as scan or sink such as shuffle exchange, union etc., which are wrapped by + * `CometScanWrapper` and `CometSinkPlaceHolder` respectively). * - * 2. The child(ren) of the current node `p` can be converted to native There are two sub-cases - * for this scenario: 1) This node `p` can also be converted to native. In this case, we'll - * create a new native Comet operator for `p` and connect it with its previously converted - * child(ren); 2) This node `p` cannot be converted to native. In this case, similar to 1) - * above, we simply return `p` as it is. Its child(ren) would still be native Comet operators. + * 2. The child(ren) of the current node `p` can be converted to native + * There are two sub-cases for this scenario: 1) This node `p` can also be converted to + * native. In this case, we'll create a new native Comet operator for `p` and connect it with + * its previously converted child(ren); 2) This node `p` cannot be converted to native. In + * this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still + * be native Comet operators. * * After this rule finishes, we'll do another pass on the final plan to convert all adjacent - * Comet native operators into a single native execution block. Please see where `convertBlock` - * is called below. + * Comet native operators into a single native execution block. Please see where + * `convertBlock` is called below. * * Here are a few examples: * - * Scan ======> CometScan \| | Filter CometFilter \| | HashAggregate CometHashAggregate \| | - * Exchange CometExchange \| | HashAggregate CometHashAggregate \| | UnsupportedOperator - * UnsupportedOperator + * Scan ======> CometScan + * | | + * Filter CometFilter + * | | + * HashAggregate CometHashAggregate + * | | + * Exchange CometExchange + * | | + * HashAggregate CometHashAggregate + * | | + * UnsupportedOperator UnsupportedOperator * * Native execution doesn't necessarily have to start from `CometScan`: * - * Scan =======> CometScan \| | UnsupportedOperator UnsupportedOperator \| | HashAggregate - * HashAggregate \| | Exchange CometExchange \| | HashAggregate CometHashAggregate \| | - * UnsupportedOperator UnsupportedOperator + * Scan =======> CometScan + * | | + * UnsupportedOperator UnsupportedOperator + * | | + * HashAggregate HashAggregate + * | | + * Exchange CometExchange + * | | + * HashAggregate CometHashAggregate + * | | + * UnsupportedOperator UnsupportedOperator * * A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`: * - * Scan Scan =======> CometScan CometScan \| | | | Filter Filter CometFilter CometFilter \| | | - * \| Union CometUnion \| | Project CometProject + * Scan Scan =======> CometScan CometScan + * | | | | + * Filter Filter CometFilter CometFilter + * | | | | + * Union CometUnion + * | | + * Project CometProject */ // spotless:on private def transform(plan: SparkPlan): SparkPlan = { @@ -227,7 +249,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { if (multiMode || sparkFinalMode) { op } else { - val plan1 = newPlanWithProto( + newPlanWithProto( op, nativeOp => { // The aggExprs could be empty. For example, if the aggregate functions only have @@ -247,8 +269,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { op.child, SerializedPlan(None)) }) - println(plan1) - plan1 } case op: ShuffledHashJoinExec @@ -519,6 +539,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val cometOp = CometLocalTableScanExec(op, op.rows, op.output) CometScanWrapper(nativeOp.get, cometOp) + case op: LocalTableScanExec if !CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf) => + withInfo(op, "LocalTableScan is not enabled") + case op => op match { case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index da39fb44d3..d6579b69d3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1174,7 +1174,6 @@ object QueryPlanSerde extends Logging with CometExprShim { if (aggregate.isInstanceOf[HashAggregateExec] || aggregate.isInstanceOf[ObjectHashAggregateExec]) && CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) => - println("COMET_EXEC_AGGREGATE_ENABLED0") val groupingExpressions = aggregate.groupingExpressions val aggregateExpressions = aggregate.aggregateExpressions val aggregateAttributes = aggregate.aggregateAttributes @@ -1182,14 +1181,12 @@ object QueryPlanSerde extends Logging with CometExprShim { val child = aggregate.child if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) { - println("COMET_EXEC_AGGREGATE_ENABLED1") withInfo(op, "No group by or aggregation") return None } // Aggregate expressions with filter are not supported yet. if (aggregateExpressions.exists(_.filter.isDefined)) { - println("COMET_EXEC_AGGREGATE_ENABLED2") withInfo(op, "Aggregate expression with filter is not supported") return None } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 4b0def6e7e..611d367f35 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -90,7 +90,9 @@ case class CometLocalTableScanExec( override def equals(obj: Any): Boolean = { obj match { case other: CometLocalTableScanExec => - this.originalPlan == other.originalPlan && this.schema == other.schema && this.output == other.output + this.originalPlan == other.originalPlan && + this.schema == other.schema && + this.output == other.output case _ => false } From 55992c3570a55c777f9cde6736228c5d60e8fbec Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 8 Nov 2025 19:54:32 +0400 Subject: [PATCH 6/9] Some refactoring --- spark/src/main/scala/org/apache/spark/sql/comet/operators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a5c4672fc7..d7a743eb2d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -298,6 +298,7 @@ abstract class CometNativeExec extends CometExec { if (containsBroadcastInput && firstNonBroadcastPlan.isEmpty) { throw new CometRuntimeException(s"Cannot find the first non broadcast plan: $this") } + // If the first non broadcast plan is found, we need to adjust the partition number of // the broadcast plans to make sure they have the same partition number as the first non // broadcast plan. From 87f7b40736ba3fea0c84397817b23453dd4c80b2 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 8 Nov 2025 20:36:40 +0400 Subject: [PATCH 7/9] Add some unit tests --- .../apache/comet/exec/CometExecSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 56174c7fc0..f1f0e48e03 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2277,6 +2277,26 @@ class CometExecSuite extends CometTestBase { } } + test("LocalTableScanExec spark fallback") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") { + val df = Seq.range(0, 10).toDF("id") + checkSparkAnswerAndFallbackReason(df, "LocalTableScan is not enabled") + } + } + + test("LocalTableScanExec with filter") { + val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5) + checkSparkAnswerAndOperator(df) + } + + test("LocalTableScanExec with groupBy") { + val df = (Seq.range(0, 10) ++ Seq.range(0, 20)) + .toDF("id") + .groupBy(col("id")) + .agg(count("*")) + checkSparkAnswerAndOperator(df) + } + } case class BucketedTableTestSpec( From e621100f1811e5e49d1084d520295c4e5d24c33a Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 8 Nov 2025 22:26:32 +0400 Subject: [PATCH 8/9] Fix PR issues --- .../scala/org/apache/comet/CometConf.scala | 2 +- .../apache/comet/rules/CometExecRule.scala | 93 +++++++++---------- .../serde/operator/CometLocalTableScan.scala | 2 +- .../apache/comet/exec/CometExecSuite.scala | 18 ++-- 4 files changed, 55 insertions(+), 60 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index e7d0be2ede..f28351c476 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -255,7 +255,7 @@ object CometConf extends ShimCometConf { val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig("localTableScan", defaultValue = true) + createExecEnabledConfig("localTableScan", defaultValue = false) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 52ea7faf06..d511df2f8d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -78,63 +78,49 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { /** * Tries to transform a Spark physical plan into a Comet plan. * - * This rule traverses bottom-up from the original Spark plan and for each plan node, there - * are a few cases to consider: + * This rule traverses bottom-up from the original Spark plan and for each plan node, there are + * a few cases to consider: * - * 1. The child(ren) of the current node `p` cannot be converted to native - * In this case, we'll simply return the original Spark plan, since Comet native - * execution cannot start from an arbitrary Spark operator (unless it is special node - * such as scan or sink such as shuffle exchange, union etc., which are wrapped by - * `CometScanWrapper` and `CometSinkPlaceHolder` respectively). + * 1. The child(ren) of the current node `p` cannot be converted to native In this case, we'll + * simply return the original Spark plan, since Comet native execution cannot start from an + * arbitrary Spark operator (unless it is special node such as scan or sink such as shuffle + * exchange, union etc., which are wrapped by `CometScanWrapper` and `CometSinkPlaceHolder` + * respectively). * - * 2. The child(ren) of the current node `p` can be converted to native - * There are two sub-cases for this scenario: 1) This node `p` can also be converted to - * native. In this case, we'll create a new native Comet operator for `p` and connect it with - * its previously converted child(ren); 2) This node `p` cannot be converted to native. In - * this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still - * be native Comet operators. + * 2. The child(ren) of the current node `p` can be converted to native There are two sub-cases + * for this scenario: 1) This node `p` can also be converted to native. In this case, we'll + * create a new native Comet operator for `p` and connect it with its previously converted + * child(ren); 2) This node `p` cannot be converted to native. In this case, similar to 1) + * above, we simply return `p` as it is. Its child(ren) would still be native Comet operators. * * After this rule finishes, we'll do another pass on the final plan to convert all adjacent - * Comet native operators into a single native execution block. Please see where - * `convertBlock` is called below. + * Comet native operators into a single native execution block. Please see where `convertBlock` + * is called below. * * Here are a few examples: * - * Scan ======> CometScan - * | | - * Filter CometFilter - * | | - * HashAggregate CometHashAggregate - * | | - * Exchange CometExchange - * | | - * HashAggregate CometHashAggregate - * | | - * UnsupportedOperator UnsupportedOperator + * Scan ======> CometScan + * \| | Filter CometFilter + * \| | HashAggregate CometHashAggregate + * \| | Exchange CometExchange + * \| | HashAggregate CometHashAggregate + * \| | UnsupportedOperator UnsupportedOperator * * Native execution doesn't necessarily have to start from `CometScan`: * - * Scan =======> CometScan - * | | - * UnsupportedOperator UnsupportedOperator - * | | - * HashAggregate HashAggregate - * | | - * Exchange CometExchange - * | | - * HashAggregate CometHashAggregate - * | | - * UnsupportedOperator UnsupportedOperator + * Scan =======> CometScan + * \| | UnsupportedOperator UnsupportedOperator + * \| | HashAggregate HashAggregate + * \| | Exchange CometExchange + * \| | HashAggregate CometHashAggregate + * \| | UnsupportedOperator UnsupportedOperator * * A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`: * - * Scan Scan =======> CometScan CometScan - * | | | | - * Filter Filter CometFilter CometFilter - * | | | | - * Union CometUnion - * | | - * Project CometProject + * Scan Scan =======> CometScan CometScan + * \| | | | Filter Filter CometFilter CometFilter + * \| | | | Union CometUnion + * \| | Project CometProject */ // spotless:on private def transform(plan: SparkPlan): SparkPlan = { @@ -534,13 +520,18 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s } - case op: LocalTableScanExec if CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf) => - val nativeOp = QueryPlanSerde.operator2Proto(op) - val cometOp = CometLocalTableScanExec(op, op.rows, op.output) - CometScanWrapper(nativeOp.get, cometOp) - - case op: LocalTableScanExec if !CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf) => - withInfo(op, "LocalTableScan is not enabled") + case op: LocalTableScanExec => + if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) { + QueryPlanSerde + .operator2Proto(op) + .map { nativeOp => + val cometOp = CometLocalTableScanExec(op, op.rows, op.output) + CometScanWrapper(nativeOp, cometOp) + } + .getOrElse(op) + } else { + withInfo(op, "LocalTableScan is not enabled") + } case op => op match { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala index ef270af4a4..e3e8538cf6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalTableScan.scala @@ -19,7 +19,7 @@ package org.apache.comet.serde.operator -import scala.jdk.CollectionConverters.asJavaIterableConverter +import scala.jdk.CollectionConverters._ import org.apache.spark.sql.execution.LocalTableScanExec diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index f1f0e48e03..e9fe3859a6 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2285,16 +2285,20 @@ class CometExecSuite extends CometTestBase { } test("LocalTableScanExec with filter") { - val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5) - checkSparkAnswerAndOperator(df) + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = Seq.range(0, 10).toDF("id").filter(col("id") > 5) + checkSparkAnswerAndOperator(df) + } } test("LocalTableScanExec with groupBy") { - val df = (Seq.range(0, 10) ++ Seq.range(0, 20)) - .toDF("id") - .groupBy(col("id")) - .agg(count("*")) - checkSparkAnswerAndOperator(df) + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val df = (Seq.range(0, 10) ++ Seq.range(0, 20)) + .toDF("id") + .groupBy(col("id")) + .agg(count("*")) + checkSparkAnswerAndOperator(df) + } } } From d7dffe266f38cdc7cdae19e18784e0e8a4434764 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 9 Nov 2025 09:41:48 +0400 Subject: [PATCH 9/9] Fix PR issues --- docs/source/user-guide/latest/configs.md | 2 +- .../apache/comet/rules/CometExecRule.scala | 74 +++++++++++-------- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 72d07649d9..041ec7c173 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -149,7 +149,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | | `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true | -| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | true | +| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false | | `spark.comet.exec.project.enabled` | Whether to enable project by default. | true | | `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true | | `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true | diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index d511df2f8d..4f41f33414 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -78,49 +78,63 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { /** * Tries to transform a Spark physical plan into a Comet plan. * - * This rule traverses bottom-up from the original Spark plan and for each plan node, there are - * a few cases to consider: + * This rule traverses bottom-up from the original Spark plan and for each plan node, there + * are a few cases to consider: * - * 1. The child(ren) of the current node `p` cannot be converted to native In this case, we'll - * simply return the original Spark plan, since Comet native execution cannot start from an - * arbitrary Spark operator (unless it is special node such as scan or sink such as shuffle - * exchange, union etc., which are wrapped by `CometScanWrapper` and `CometSinkPlaceHolder` - * respectively). + * 1. The child(ren) of the current node `p` cannot be converted to native + * In this case, we'll simply return the original Spark plan, since Comet native + * execution cannot start from an arbitrary Spark operator (unless it is special node + * such as scan or sink such as shuffle exchange, union etc., which are wrapped by + * `CometScanWrapper` and `CometSinkPlaceHolder` respectively). * - * 2. The child(ren) of the current node `p` can be converted to native There are two sub-cases - * for this scenario: 1) This node `p` can also be converted to native. In this case, we'll - * create a new native Comet operator for `p` and connect it with its previously converted - * child(ren); 2) This node `p` cannot be converted to native. In this case, similar to 1) - * above, we simply return `p` as it is. Its child(ren) would still be native Comet operators. + * 2. The child(ren) of the current node `p` can be converted to native + * There are two sub-cases for this scenario: 1) This node `p` can also be converted to + * native. In this case, we'll create a new native Comet operator for `p` and connect it with + * its previously converted child(ren); 2) This node `p` cannot be converted to native. In + * this case, similar to 1) above, we simply return `p` as it is. Its child(ren) would still + * be native Comet operators. * * After this rule finishes, we'll do another pass on the final plan to convert all adjacent - * Comet native operators into a single native execution block. Please see where `convertBlock` - * is called below. + * Comet native operators into a single native execution block. Please see where + * `convertBlock` is called below. * * Here are a few examples: * - * Scan ======> CometScan - * \| | Filter CometFilter - * \| | HashAggregate CometHashAggregate - * \| | Exchange CometExchange - * \| | HashAggregate CometHashAggregate - * \| | UnsupportedOperator UnsupportedOperator + * Scan ======> CometScan + * | | + * Filter CometFilter + * | | + * HashAggregate CometHashAggregate + * | | + * Exchange CometExchange + * | | + * HashAggregate CometHashAggregate + * | | + * UnsupportedOperator UnsupportedOperator * * Native execution doesn't necessarily have to start from `CometScan`: * - * Scan =======> CometScan - * \| | UnsupportedOperator UnsupportedOperator - * \| | HashAggregate HashAggregate - * \| | Exchange CometExchange - * \| | HashAggregate CometHashAggregate - * \| | UnsupportedOperator UnsupportedOperator + * Scan =======> CometScan + * | | + * UnsupportedOperator UnsupportedOperator + * | | + * HashAggregate HashAggregate + * | | + * Exchange CometExchange + * | | + * HashAggregate CometHashAggregate + * | | + * UnsupportedOperator UnsupportedOperator * * A sink can also be Comet operators other than `CometExchange`, for instance `CometUnion`: * - * Scan Scan =======> CometScan CometScan - * \| | | | Filter Filter CometFilter CometFilter - * \| | | | Union CometUnion - * \| | Project CometProject + * Scan Scan =======> CometScan CometScan + * | | | | + * Filter Filter CometFilter CometFilter + * | | | | + * Union CometUnion + * | | + * Project CometProject */ // spotless:on private def transform(plan: SparkPlan): SparkPlan = {