Skip to content

Commit eef5f28

Browse files
compheadmartin-g
andauthored
chore: check missingInput for Comet plan nodes (#2795)
* chore: check `missingInput` for Comet plan nodes --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
1 parent 71dda8b commit eef5f28

File tree

16 files changed

+44
-15
lines changed

16 files changed

+44
-15
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ package org.apache.spark.sql.comet
2222
import org.apache.spark.TaskContext
2323
import org.apache.spark.rdd.{ParallelCollectionRDD, RDD}
2424
import org.apache.spark.serializer.Serializer
25-
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression, SortOrder}
2626
import org.apache.spark.sql.catalyst.util.truncatedString
2727
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
2828
import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer}
@@ -98,10 +98,15 @@ case class CometTakeOrderedAndProjectExec(
9898
child: SparkPlan)
9999
extends CometExec
100100
with UnaryExecNode {
101+
102+
override def producedAttributes: AttributeSet = outputSet ++ AttributeSet(projectList)
103+
101104
private lazy val writeMetrics =
102105
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
106+
103107
private lazy val readMetrics =
104108
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
109+
105110
override lazy val metrics: Map[String, SQLMetric] = Map(
106111
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
107112
"numPartitions" -> SQLMetrics.createMetric(

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
4+
CometTakeOrderedAndProject [s_city] [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
55
CometProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
66
CometBroadcastHashJoin [ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
77
CometHashAggregate [ss_addr_sk,sum,sum] [ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
4+
CometTakeOrderedAndProject [s_city] [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
55
CometProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
66
CometBroadcastHashJoin [ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
77
CometHashAggregate [ss_addr_sk,sum,sum] [ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
4+
CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
55
CometProject [c_last_name,c_first_name] [customer_id,customername,c_customer_id]
66
CometBroadcastHashJoin [c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
77
CometBroadcastExchange [c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
4+
CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
55
CometProject [c_last_name,c_first_name] [customer_id,customername,c_customer_id]
66
CometBroadcastHashJoin [c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
77
CometBroadcastExchange [c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
4+
CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
55
CometProject [c_last_name,c_first_name] [customer_id,customername,c_customer_id]
66
CometBroadcastHashJoin [c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
77
CometBroadcastExchange [c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
4+
CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
55
CometProject [c_last_name,c_first_name] [customer_id,customername,c_customer_id]
66
CometBroadcastHashJoin [c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
77
CometBroadcastExchange [c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
4+
CometTakeOrderedAndProject [s_city] [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
55
CometProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
66
CometBroadcastHashJoin [ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
77
CometHashAggregate [ss_addr_sk,sum,sum] [ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
4+
CometTakeOrderedAndProject [s_city] [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
55
CometProject [c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit,s_city]
66
CometBroadcastHashJoin [ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
77
CometHashAggregate [ss_addr_sk,sum,sum] [ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
4-
CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
4+
CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
55
CometProject [c_last_name,c_first_name] [customer_id,customername,c_customer_id]
66
CometBroadcastHashJoin [c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
77
CometBroadcastExchange [c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1

0 commit comments

Comments
 (0)