Skip to content

Commit 0abb752

Browse files
authored
Merge pull request #26 from SOFTNETWORK-APP/fix/termsAgg
- add support for count, min, max, avg and sum with partitions - add full support for windowing with multiple partitions - add support for bucket script - normalize SQL query before parsing it - fix terms agg - fix count star
2 parents 071737b + ff1349c commit 0abb752

File tree

55 files changed

+7095
-1354
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+7095
-1354
lines changed

README.md

Lines changed: 706 additions & 22 deletions
Large diffs are not rendered by default.

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 402 additions & 134 deletions
Large diffs are not rendered by default.

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticSearchRequest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
package app.softnetwork.elastic.sql.bridge
1818

19-
import app.softnetwork.elastic.sql.query.{Bucket, Criteria, Except, Field}
19+
import app.softnetwork.elastic.sql.query.{Bucket, Criteria, Except, Field, FieldSort}
2020
import com.sksamuel.elastic4s.requests.searches.{SearchBodyBuilderFn, SearchRequest}
2121

2222
case class ElasticSearchRequest(
23+
sql: String,
2324
fields: Seq[Field],
2425
except: Option[Except],
2526
sources: Seq[String],
@@ -28,7 +29,8 @@ case class ElasticSearchRequest(
2829
offset: Option[Int],
2930
search: SearchRequest,
3031
buckets: Seq[Bucket] = Seq.empty,
31-
aggregations: Seq[ElasticAggregation] = Seq.empty
32+
having: Option[Criteria] = None,
33+
sorts: Seq[FieldSort] = Seq.empty
3234
) {
3335
def minScore(score: Option[Double]): ElasticSearchRequest = {
3436
score match {

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 96 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616

1717
package app.softnetwork.elastic.sql
1818

19-
import app.softnetwork.elastic.sql.`type`.{SQLBigInt, SQLDouble, SQLTemporal, SQLVarchar}
19+
import app.softnetwork.elastic.sql.`type`.{
20+
SQLBigInt,
21+
SQLDouble,
22+
SQLNumeric,
23+
SQLTemporal,
24+
SQLVarchar
25+
}
2026
import app.softnetwork.elastic.sql.function.aggregate.COUNT
2127
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
2228
import app.softnetwork.elastic.sql.operator._
@@ -27,14 +33,14 @@ import com.sksamuel.elastic4s.requests.common.FetchSourceContext
2733
import com.sksamuel.elastic4s.requests.script.Script
2834
import com.sksamuel.elastic4s.requests.script.ScriptType.Source
2935
import com.sksamuel.elastic4s.requests.searches.aggs.{
30-
Aggregation,
36+
AbstractAggregation,
3137
FilterAggregation,
3238
NestedAggregation,
3339
TermsAggregation
3440
}
3541
import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery
3642
import com.sksamuel.elastic4s.requests.searches.queries.{InnerHit, Query}
37-
import com.sksamuel.elastic4s.requests.searches.sort.FieldSort
43+
import com.sksamuel.elastic4s.requests.searches.sort.{FieldSort, ScriptSort, ScriptSortType}
3844
import com.sksamuel.elastic4s.requests.searches.{
3945
MultiSearchRequest,
4046
SearchBodyBuilderFn,
@@ -142,10 +148,10 @@ package object bridge {
142148
implicit def requestToRootAggregations(
143149
request: SQLSearchRequest,
144150
aggregations: Seq[ElasticAggregation]
145-
): Seq[Aggregation] = {
151+
): Seq[AbstractAggregation] = {
146152
val notNestedAggregations = aggregations.filterNot(_.nested)
147153

148-
val notNestedBuckets = request.buckets.filterNot(_.nested)
154+
val notNestedBuckets = request.bucketTree.filterNot(_.bucket.nested)
149155

150156
val rootAggregations = notNestedAggregations match {
151157
case Nil =>
@@ -158,8 +164,8 @@ package object bridge {
158164
None,
159165
aggregations
160166
) match {
161-
case Some(b) => Seq(b)
162-
case _ => Seq.empty
167+
case Nil => Seq.empty
168+
case aggs => aggs
163169
}
164170
buckets
165171
case aggs =>
@@ -173,14 +179,18 @@ package object bridge {
173179
val buckets = ElasticAggregation.buildBuckets(
174180
notNestedBuckets,
175181
request.sorts -- directions.keys,
176-
aggregations,
182+
aggs,
177183
directions,
178184
request.having.flatMap(_.criteria),
179185
None,
180186
aggs
181187
) match {
182-
case Some(b) => Seq(b)
183-
case _ => aggregations
188+
case Nil => aggs.map(_.agg)
189+
case aggs =>
190+
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
191+
notNestedAggregations.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
192+
else
193+
aggs
184194
}
185195
buckets
186196
}
@@ -204,12 +214,14 @@ package object bridge {
204214

205215
// Group nested buckets by their nested path
206216
val nestedGroupedBuckets =
207-
request.buckets
208-
.filter(_.nested)
209-
.groupBy(
210-
_.nestedBucket.getOrElse(
211-
throw new IllegalArgumentException(
212-
"Nested bucket must have a nested element"
217+
request.bucketTree
218+
.filter(_.bucket.nested)
219+
.map(tree =>
220+
tree.groupBy(
221+
_.bucket.nestedBucket.getOrElse(
222+
throw new IllegalArgumentException(
223+
"Nested bucket must have a nested element"
224+
)
213225
)
214226
)
215227
)
@@ -229,17 +241,16 @@ package object bridge {
229241

230242
// Get the buckets for this nested element
231243
val nestedBuckets =
232-
nestedGroupedBuckets.getOrElse(n.innerHitsName, Seq.empty)
244+
nestedGroupedBuckets.map(_.getOrElse(n.innerHitsName, Seq.empty))
233245

234246
val notRelatedAggregationsToBuckets = elasticAggregations
235247
.filterNot { ea =>
236-
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
248+
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
237249
}
238-
.map(_.agg)
239250

240251
val relatedAggregationsToBuckets = elasticAggregations
241252
.filter { ea =>
242-
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
253+
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
243254
}
244255
.map(_.agg)
245256

@@ -257,7 +268,7 @@ package object bridge {
257268
requestToNestedFilterAggregation(request, n.innerHitsName)
258269

259270
// Build buckets for this nested aggregation
260-
val buckets: Seq[Aggregation] =
271+
val buckets: Seq[AbstractAggregation] =
261272
ElasticAggregation.buildBuckets(
262273
nestedBuckets,
263274
request.sorts -- directions.keys,
@@ -267,8 +278,12 @@ package object bridge {
267278
Some(n),
268279
aggregations
269280
) match {
270-
case Some(b) => Seq(b)
271-
case _ => notRelatedAggregationsToBuckets
281+
case Nil => notRelatedAggregationsToBuckets.map(_.agg)
282+
case aggs =>
283+
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
284+
notRelatedAggregationsToBuckets.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
285+
else
286+
aggs
272287
}
273288

274289
val children = n.children
@@ -373,7 +388,7 @@ package object bridge {
373388
}
374389

375390
private def addNestedAggregationsToTermsAggregation(
376-
agg: Aggregation,
391+
agg: AbstractAggregation,
377392
nested: Seq[NestedAggregation]
378393
): Option[TermsAggregation] = {
379394
agg match {
@@ -397,24 +412,29 @@ package object bridge {
397412

398413
implicit def requestToElasticSearchRequest(request: SQLSearchRequest): ElasticSearchRequest =
399414
ElasticSearchRequest(
415+
request.sql,
400416
request.select.fields,
401417
request.select.except,
402418
request.sources,
403419
request.where.flatMap(_.criteria),
404420
request.limit.map(_.limit),
405-
request.limit.flatMap(_.offset.map(_.offset)),
421+
request.limit.flatMap(_.offset.map(_.offset)).orElse(Some(0)),
406422
request,
407423
request.buckets,
408-
request.aggregates.map(
409-
ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts)
410-
)
424+
request.having.flatMap(_.criteria),
425+
request.orderBy.map(_.sorts).getOrElse(Seq.empty)
411426
).minScore(request.score)
412427

413428
implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
414429
import request._
415430

416431
val aggregations = request.aggregates.map(
417-
ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts)
432+
ElasticAggregation(
433+
_,
434+
request.having.flatMap(_.criteria),
435+
request.sorts,
436+
request.sqlAggregations
437+
)
418438
)
419439

420440
val rootAggregations = requestToRootAggregations(request, aggregations)
@@ -457,7 +477,7 @@ package object bridge {
457477
_search
458478
}
459479

460-
_search = scriptFields.filterNot(_.aggregation) match {
480+
_search = scriptFields.filterNot(_.isAggregation) match {
461481
case Nil => _search
462482
case _ =>
463483
_search scriptfields scriptFields.map { field =>
@@ -478,17 +498,55 @@ package object bridge {
478498

479499
_search = orderBy match {
480500
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
481-
_search sortBy o.sorts.map(sort =>
482-
sort.order match {
483-
case Some(Desc) => FieldSort(sort.field).desc()
484-
case _ => FieldSort(sort.field).asc()
501+
_search sortBy o.sorts.map { sort =>
502+
if (sort.isScriptSort) {
503+
val context = PainlessContext()
504+
val painless = sort.field.painless(Some(context))
505+
val painlessScript = s"$context$painless"
506+
val script =
507+
sort.out match {
508+
case _: SQLTemporal if !painless.endsWith("toEpochMilli()") =>
509+
val parts = painlessScript.split(";").toSeq
510+
if (parts.size > 1) {
511+
val lastPart = parts.last.trim.stripPrefix("return ")
512+
if (lastPart.split(" ").toSeq.size == 1) {
513+
val newLastPart =
514+
s"""($lastPart != null) ? $lastPart.toInstant().toEpochMilli() : null"""
515+
s"${parts.dropRight(1).mkString(";")}; return $newLastPart"
516+
} else {
517+
painlessScript
518+
}
519+
} else {
520+
s"$painlessScript.toInstant().toEpochMilli()"
521+
}
522+
case _ => painlessScript
523+
}
524+
val scriptSort =
525+
ScriptSort(
526+
script = Script(script = script)
527+
.lang("painless")
528+
.scriptType(Source),
529+
scriptSortType = sort.field.out match {
530+
case _: SQLTemporal | _: SQLNumeric => ScriptSortType.Number
531+
case _ => ScriptSortType.String
532+
}
533+
)
534+
sort.order match {
535+
case Some(Desc) => scriptSort.desc()
536+
case _ => scriptSort.asc()
537+
}
538+
} else {
539+
sort.order match {
540+
case Some(Desc) => FieldSort(sort.field.aliasOrName).desc()
541+
case _ => FieldSort(sort.field.aliasOrName).asc()
542+
}
485543
}
486-
)
544+
}
487545
case _ => _search
488546
}
489547

490548
if (allAggregations.nonEmpty && fields.isEmpty) {
491-
_search size 0
549+
_search size 0 fetchSource false
492550
} else {
493551
limit match {
494552
case Some(l) => _search limit l.limit from l.offset.map(_.offset).getOrElse(0)
@@ -512,7 +570,7 @@ package object bridge {
512570

513571
implicit def expressionToQuery(expression: GenericExpression): Query = {
514572
import expression._
515-
if (aggregation)
573+
if (isAggregation)
516574
return matchAllQuery()
517575
if (
518576
identifier.functions.nonEmpty && (identifier.functions.size > 1 || (identifier.functions.head match {
@@ -946,7 +1004,7 @@ package object bridge {
9461004
case Left(l) =>
9471005
val filteredAgg: Option[FilterAggregation] = requestToFilterAggregation(l)
9481006
l.aggregates
949-
.map(ElasticAggregation(_, l.having.flatMap(_.criteria), l.sorts))
1007+
.map(ElasticAggregation(_, l.having.flatMap(_.criteria), l.sorts, l.sqlAggregations))
9501008
.map(aggregation => {
9511009
val queryFiltered =
9521010
l.where

0 commit comments

Comments
 (0)