Skip to content

Commit c39d9b9

Browse files
committed
minor refactoring for window functions
1 parent 9db439f commit c39d9b9

File tree

7 files changed

+57
-56
lines changed

7 files changed

+57
-56
lines changed

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ object ElasticAggregation {
113113
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
114114
else {
115115
aggType match {
116-
case th: TopHitsAggregation =>
117-
s"${th.topHits.sql.toLowerCase}_${sourceField.replace(".", "_")}"
116+
case th: WindowFunction =>
117+
s"${th.window.sql.toLowerCase}_${sourceField.replace(".", "_")}"
118118
case _ =>
119119
s"${aggType}_${sourceField.replace(".", "_")}"
120120

@@ -154,7 +154,7 @@ object ElasticAggregation {
154154
case MAX => aggWithFieldOrScript(maxAgg, (name, s) => maxAgg(name, sourceField).script(s))
155155
case AVG => aggWithFieldOrScript(avgAgg, (name, s) => avgAgg(name, sourceField).script(s))
156156
case SUM => aggWithFieldOrScript(sumAgg, (name, s) => sumAgg(name, sourceField).script(s))
157-
case th: TopHitsAggregation =>
157+
case th: WindowFunction =>
158158
val limit = {
159159
th match {
160160
case _: LastValue => 1
@@ -184,12 +184,12 @@ object ElasticAggregation {
184184
.size(limit) sortBy th.orderBy.sorts.map(sort =>
185185
sort.order match {
186186
case Some(Desc) =>
187-
th.topHits match {
187+
th.window match {
188188
case LAST_VALUE => FieldSort(sort.field).asc()
189189
case _ => FieldSort(sort.field).desc()
190190
}
191191
case _ =>
192-
th.topHits match {
192+
th.window match {
193193
case LAST_VALUE => FieldSort(sort.field).desc()
194194
case _ => FieldSort(sort.field).asc()
195195
}

es6/bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ object ElasticAggregation {
113113
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
114114
else {
115115
aggType match {
116-
case th: TopHitsAggregation =>
117-
s"${th.topHits.sql.toLowerCase}_${sourceField.replace(".", "_")}"
116+
case th: WindowFunction =>
117+
s"${th.window.sql.toLowerCase}_${sourceField.replace(".", "_")}"
118118
case _ =>
119119
s"${aggType}_${sourceField.replace(".", "_")}"
120120

@@ -154,7 +154,7 @@ object ElasticAggregation {
154154
case MAX => aggWithFieldOrScript(maxAgg, (name, s) => maxAgg(name, sourceField).script(s))
155155
case AVG => aggWithFieldOrScript(avgAgg, (name, s) => avgAgg(name, sourceField).script(s))
156156
case SUM => aggWithFieldOrScript(sumAgg, (name, s) => sumAgg(name, sourceField).script(s))
157-
case th: TopHitsAggregation =>
157+
case th: WindowFunction =>
158158
val limit = {
159159
th match {
160160
case _: LastValue => 1
@@ -184,12 +184,12 @@ object ElasticAggregation {
184184
.size(limit) sortBy th.orderBy.sorts.map(sort =>
185185
sort.order match {
186186
case Some(Desc) =>
187-
th.topHits match {
187+
th.window match {
188188
case LAST_VALUE => FieldSort(sort.field).asc()
189189
case _ => FieldSort(sort.field).desc()
190190
}
191191
case _ =>
192-
th.topHits match {
192+
th.window match {
193193
case LAST_VALUE => FieldSort(sort.field).desc()
194194
case _ => FieldSort(sort.field).asc()
195195
}

sql/src/main/scala/app/softnetwork/elastic/sql/function/aggregate/package.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,32 @@ package object aggregate {
3535

3636
case object SUM extends Expr("SUM") with AggregateFunction
3737

38-
sealed trait TopHits extends TokenRegex
38+
sealed trait Window extends TokenRegex
3939

40-
case object FIRST_VALUE extends Expr("FIRST_VALUE") with TopHits {
40+
case object FIRST_VALUE extends Expr("FIRST_VALUE") with Window {
4141
override val words: List[String] = List(sql, "FIRST")
4242
}
4343

44-
case object LAST_VALUE extends Expr("LAST_VALUE") with TopHits {
44+
case object LAST_VALUE extends Expr("LAST_VALUE") with Window {
4545
override val words: List[String] = List(sql, "LAST")
4646
}
4747

48-
case object ARRAY_AGG extends Expr("ARRAY_AGG") with TopHits {
48+
case object ARRAY_AGG extends Expr("ARRAY_AGG") with Window {
4949
override val words: List[String] = List(sql, "ARRAY")
5050
}
5151

5252
case object OVER extends Expr("OVER") with TokenRegex
5353

5454
case object PARTITION_BY extends Expr("PARTITION BY") with TokenRegex
5555

56-
sealed trait TopHitsAggregation
56+
sealed trait WindowFunction
5757
extends AggregateFunction
5858
with FunctionWithIdentifier
5959
with Updateable {
6060
def partitionBy: Seq[Identifier]
61-
def withPartitionBy(partitionBy: Seq[Identifier]): TopHitsAggregation
61+
def withPartitionBy(partitionBy: Seq[Identifier]): WindowFunction
6262
def orderBy: OrderBy
63-
def topHits: TopHits
63+
def window: Window
6464
def limit: Option[Limit]
6565

6666
lazy val buckets: Seq[Bucket] = partitionBy.map(identifier => Bucket(identifier, None))
@@ -73,16 +73,16 @@ package object aggregate {
7373
val partitionByStr =
7474
if (partitionBy.nonEmpty) s"$PARTITION_BY ${partitionBy.mkString(", ")}"
7575
else ""
76-
s"$topHits($identifier) $OVER ($partitionByStr$orderBy)"
76+
s"$window($identifier) $OVER ($partitionByStr$orderBy)"
7777
}
7878

7979
override def toSQL(base: String): String = sql
8080

8181
def fields: Seq[Field]
8282

83-
def withFields(fields: Seq[Field]): TopHitsAggregation
83+
def withFields(fields: Seq[Field]): WindowFunction
8484

85-
def update(request: SQLSearchRequest): TopHitsAggregation = {
85+
def update(request: SQLSearchRequest): WindowFunction = {
8686
val updated = this
8787
.withPartitionBy(partitionBy = partitionBy.map(_.update(request)))
8888
updated.withFields(
@@ -101,13 +101,13 @@ package object aggregate {
101101
partitionBy: Seq[Identifier] = Seq.empty,
102102
orderBy: OrderBy,
103103
fields: Seq[Field] = Seq.empty
104-
) extends TopHitsAggregation {
104+
) extends WindowFunction {
105105
override def limit: Option[Limit] = Some(Limit(1, None))
106-
override def topHits: TopHits = FIRST_VALUE
107-
override def withPartitionBy(partitionBy: Seq[Identifier]): TopHitsAggregation =
106+
override def window: Window = FIRST_VALUE
107+
override def withPartitionBy(partitionBy: Seq[Identifier]): WindowFunction =
108108
this.copy(partitionBy = partitionBy)
109-
override def withFields(fields: Seq[Field]): TopHitsAggregation = this.copy(fields = fields)
110-
override def update(request: SQLSearchRequest): TopHitsAggregation = super
109+
override def withFields(fields: Seq[Field]): WindowFunction = this.copy(fields = fields)
110+
override def update(request: SQLSearchRequest): WindowFunction = super
111111
.update(request)
112112
.asInstanceOf[FirstValue]
113113
.copy(
@@ -121,13 +121,13 @@ package object aggregate {
121121
partitionBy: Seq[Identifier] = Seq.empty,
122122
orderBy: OrderBy,
123123
fields: Seq[Field] = Seq.empty
124-
) extends TopHitsAggregation {
124+
) extends WindowFunction {
125125
override def limit: Option[Limit] = Some(Limit(1, None))
126-
override def topHits: TopHits = LAST_VALUE
127-
override def withPartitionBy(partitionBy: Seq[Identifier]): TopHitsAggregation =
126+
override def window: Window = LAST_VALUE
127+
override def withPartitionBy(partitionBy: Seq[Identifier]): WindowFunction =
128128
this.copy(partitionBy = partitionBy)
129-
override def withFields(fields: Seq[Field]): TopHitsAggregation = this.copy(fields = fields)
130-
override def update(request: SQLSearchRequest): TopHitsAggregation = super
129+
override def withFields(fields: Seq[Field]): WindowFunction = this.copy(fields = fields)
130+
override def update(request: SQLSearchRequest): WindowFunction = super
131131
.update(request)
132132
.asInstanceOf[LastValue]
133133
.copy(
@@ -142,12 +142,12 @@ package object aggregate {
142142
orderBy: OrderBy,
143143
fields: Seq[Field] = Seq.empty,
144144
limit: Option[Limit] = None
145-
) extends TopHitsAggregation {
146-
override def topHits: TopHits = ARRAY_AGG
147-
override def withPartitionBy(partitionBy: Seq[Identifier]): TopHitsAggregation =
145+
) extends WindowFunction {
146+
override def window: Window = ARRAY_AGG
147+
override def withPartitionBy(partitionBy: Seq[Identifier]): WindowFunction =
148148
this.copy(partitionBy = partitionBy)
149-
override def withFields(fields: Seq[Field]): TopHitsAggregation = this
150-
override def update(request: SQLSearchRequest): TopHitsAggregation = super
149+
override def withFields(fields: Seq[Field]): WindowFunction = this
150+
override def update(request: SQLSearchRequest): WindowFunction = super
151151
.update(request)
152152
.asInstanceOf[ArrayAgg]
153153
.copy(

sql/src/main/scala/app/softnetwork/elastic/sql/parser/SelectParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ trait SelectParser {
2222
self: Parser with WhereParser =>
2323

2424
def field: PackratParser[Field] =
25-
(identifierWithTopHits |
25+
(identifierWithWindowFunction |
2626
identifierWithArithmeticExpression |
2727
identifierWithTransformation |
2828
identifierWithAggregation |

sql/src/main/scala/app/softnetwork/elastic/sql/parser/function/aggregate/package.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,22 @@ package object aggregate {
5959
}
6060
}
6161

62-
def first_value: PackratParser[TopHitsAggregation] =
62+
def first_value: PackratParser[WindowFunction] =
6363
FIRST_VALUE.regex ~ top_hits ^^ { case _ ~ top =>
6464
FirstValue(top._1, top._2, top._3)
6565
}
6666

67-
def last_value: PackratParser[TopHitsAggregation] =
67+
def last_value: PackratParser[WindowFunction] =
6868
LAST_VALUE.regex ~ top_hits ^^ { case _ ~ top =>
6969
LastValue(top._1, top._2, top._3)
7070
}
7171

72-
def array_agg: PackratParser[TopHitsAggregation] =
72+
def array_agg: PackratParser[WindowFunction] =
7373
ARRAY_AGG.regex ~ top_hits ^^ { case _ ~ top =>
7474
ArrayAgg(top._1, top._2, top._3, limit = None)
7575
}
7676

77-
def identifierWithTopHits: PackratParser[Identifier] =
77+
def identifierWithWindowFunction: PackratParser[Identifier] =
7878
(first_value | last_value | array_agg) ^^ { th =>
7979
th.identifier.withFunctions(th +: th.identifier.functions)
8080
}

sql/src/main/scala/app/softnetwork/elastic/sql/query/SQLSearchRequest.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package app.softnetwork.elastic.sql.query
1818

19-
import app.softnetwork.elastic.sql.function.aggregate.TopHitsAggregation
19+
import app.softnetwork.elastic.sql.function.aggregate.WindowFunction
2020
import app.softnetwork.elastic.sql.{asString, Token}
2121

2222
case class SQLSearchRequest(
@@ -123,16 +123,17 @@ case class SQLSearchRequest(
123123
.filterNot(_.nested)
124124
.map(_.sourceField)
125125
.filterNot(f => excludes.contains(f))
126+
.distinct
126127
else
127128
Seq.empty
128129
}
129130

130-
lazy val topHitsFields: Seq[Field] = select.fields.filter(_.topHits.nonEmpty)
131+
lazy val windowFields: Seq[Field] = select.fields.filter(_.windows.nonEmpty)
131132

132-
lazy val topHitsAggs: Seq[TopHitsAggregation] = topHitsFields.flatMap(_.topHits)
133+
lazy val windowFunctions: Seq[WindowFunction] = windowFields.flatMap(_.windows)
133134

134135
lazy val aggregates: Seq[Field] =
135-
select.fields.filter(_.aggregation).filterNot(_.topHits.isDefined) ++ topHitsFields
136+
select.fields.filter(_.aggregation).filterNot(_.windows.isDefined) ++ windowFields
136137

137138
lazy val sqlAggregations: Map[String, SQLAggregation] =
138139
aggregates.flatMap(f => SQLAggregation.fromField(f, this)).map(a => a.aggName -> a).toMap
@@ -141,7 +142,7 @@ case class SQLSearchRequest(
141142

142143
lazy val sources: Seq[String] = from.tables.map(_.name)
143144

144-
lazy val topHitsBuckets: Seq[Bucket] = topHitsAggs
145+
lazy val windowBuckets: Seq[Bucket] = windowFunctions
145146
.flatMap(_.bucketNames)
146147
.filterNot(bucket =>
147148
groupBy.map(_.bucketNames).getOrElse(Map.empty).keys.toSeq.contains(bucket._1)
@@ -150,7 +151,7 @@ case class SQLSearchRequest(
150151
.values
151152
.toSeq
152153

153-
lazy val buckets: Seq[Bucket] = groupBy.map(_.buckets).getOrElse(Seq.empty) ++ topHitsBuckets
154+
lazy val buckets: Seq[Bucket] = groupBy.map(_.buckets).getOrElse(Seq.empty) ++ windowBuckets
154155

155156
override def validate(): Either[String, Unit] = {
156157
for {

sql/src/main/scala/app/softnetwork/elastic/sql/query/Select.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package app.softnetwork.elastic.sql.query
1818

19-
import app.softnetwork.elastic.sql.function.aggregate.{AggregateFunction, TopHitsAggregation}
19+
import app.softnetwork.elastic.sql.function.aggregate.{AggregateFunction, WindowFunction}
2020
import app.softnetwork.elastic.sql.function.{Function, FunctionChain, FunctionUtils}
2121
import app.softnetwork.elastic.sql.{
2222
asString,
@@ -64,19 +64,19 @@ case class Field(
6464

6565
override def functions: List[Function] = identifier.functions
6666

67-
lazy val topHits: Option[TopHitsAggregation] =
68-
functions.collectFirst { case th: TopHitsAggregation => th }
67+
lazy val windows: Option[WindowFunction] =
68+
functions.collectFirst { case th: WindowFunction => th }
6969

7070
def update(request: SQLSearchRequest): Field = {
71-
topHits match {
71+
windows match {
7272
case Some(th) =>
73-
val topHitsAggregation = th.update(request)
74-
val identifier = topHitsAggregation.identifier
73+
val windowFunction = th.update(request)
74+
val identifier = windowFunction.identifier
7575
identifier.functions match {
7676
case _ :: tail =>
77-
this.copy(identifier = identifier.withFunctions(functions = topHitsAggregation +: tail))
77+
this.copy(identifier = identifier.withFunctions(functions = windowFunction +: tail))
7878
case _ =>
79-
this.copy(identifier = identifier.withFunctions(functions = List(topHitsAggregation)))
79+
this.copy(identifier = identifier.withFunctions(functions = List(windowFunction)))
8080
}
8181
case None => this.copy(identifier = identifier.update(request))
8282
}
@@ -162,8 +162,8 @@ object SQLAggregation {
162162
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
163163
else {
164164
aggType match {
165-
case th: TopHitsAggregation =>
166-
s"${th.topHits.sql.toLowerCase}_${sourceField.replace(".", "_")}"
165+
case th: WindowFunction =>
166+
s"${th.window.sql.toLowerCase}_${sourceField.replace(".", "_")}"
167167
case _ =>
168168
s"${aggType}_${sourceField.replace(".", "_")}"
169169

0 commit comments

Comments
 (0)