Skip to content

Commit 599694e

Browse files
committed
add isAggregation and asAggregation functions, fix _source for pure aggregations, update group by and order by to allow functions, update aggregations to allow complex expressions, implements script sort within order by
1 parent d69d691 commit 599694e

File tree

19 files changed

+551
-251
lines changed

19 files changed

+551
-251
lines changed

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 145 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package app.softnetwork.elastic.sql.bridge
1818

1919
import app.softnetwork.elastic.sql.PainlessContext
20+
import app.softnetwork.elastic.sql.`type`.SQLTemporal
2021
import app.softnetwork.elastic.sql.query.{
2122
Asc,
2223
Bucket,
@@ -31,6 +32,8 @@ import app.softnetwork.elastic.sql.query.{
3132
}
3233
import app.softnetwork.elastic.sql.function._
3334
import app.softnetwork.elastic.sql.function.aggregate._
35+
import app.softnetwork.elastic.sql.function.time.DateTrunc
36+
import app.softnetwork.elastic.sql.time.TimeUnit
3437
import com.sksamuel.elastic4s.ElasticApi.{
3538
avgAgg,
3639
bucketSelectorAggregation,
@@ -44,11 +47,14 @@ import com.sksamuel.elastic4s.ElasticApi.{
4447
valueCountAgg
4548
}
4649
import com.sksamuel.elastic4s.requests.script.Script
50+
import com.sksamuel.elastic4s.requests.searches.DateHistogramInterval
4751
import com.sksamuel.elastic4s.requests.searches.aggs.{
4852
Aggregation,
4953
CardinalityAggregation,
54+
DateHistogramAggregation,
5055
ExtendedStatsAggregation,
5156
FilterAggregation,
57+
HistogramOrder,
5258
NestedAggregation,
5359
StatsAggregation,
5460
TermsAggregation,
@@ -93,7 +99,10 @@ object ElasticAggregation {
9399
import sqlAgg._
94100
val sourceField = identifier.path
95101

96-
val direction = bucketsDirection.get(identifier.identifierName)
102+
val direction =
103+
bucketsDirection
104+
.get(identifier.identifierName)
105+
.orElse(bucketsDirection.get(identifier.aliasOrName))
97106

98107
val field = fieldAlias match {
99108
case Some(alias) => alias.alias
@@ -190,13 +199,13 @@ object ElasticAggregation {
190199
sort.order match {
191200
case Some(Desc) =>
192201
th.window match {
193-
case LAST_VALUE => FieldSort(sort.field).asc()
194-
case _ => FieldSort(sort.field).desc()
202+
case LAST_VALUE => FieldSort(sort.field.aliasOrName).asc()
203+
case _ => FieldSort(sort.field.aliasOrName).desc()
195204
}
196205
case _ =>
197206
th.window match {
198-
case LAST_VALUE => FieldSort(sort.field).desc()
199-
case _ => FieldSort(sort.field).asc()
207+
case LAST_VALUE => FieldSort(sort.field.aliasOrName).desc()
208+
case _ => FieldSort(sort.field.aliasOrName).asc()
200209
}
201210
}
202211
)
@@ -272,82 +281,152 @@ object ElasticAggregation {
272281
having: Option[Criteria],
273282
nested: Option[NestedElement],
274283
allElasticAggregations: Seq[ElasticAggregation]
275-
): Option[TermsAggregation] = {
276-
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
284+
): Option[Aggregation] = {
285+
var first = false
286+
val nbBuckets = buckets.size
287+
buckets.reverse.foldLeft(Option.empty[Aggregation]) { (current, bucket) =>
277288
// Determine the bucketPath of the current bucket
278289
val currentBucketPath = bucket.identifier.path
279290

280-
var agg = {
281-
bucketsDirection.get(bucket.identifier.identifierName) match {
282-
case Some(direction) =>
283-
termsAgg(bucket.name, currentBucketPath)
284-
.order(Seq(direction match {
285-
case Asc => TermsOrder("_key", asc = true)
286-
case _ => TermsOrder("_key", asc = false)
287-
}))
288-
case None =>
289-
termsAgg(bucket.name, currentBucketPath)
291+
val minDocCount =
292+
if ((first || current.isEmpty) && nbBuckets > 1) {
293+
0
294+
} else {
295+
first = true
296+
1
297+
}
298+
299+
var agg: Aggregation = {
300+
bucket.out match {
301+
case _: SQLTemporal =>
302+
val functions = bucket.identifier.functions
303+
val interval: Option[DateHistogramInterval] =
304+
if (functions.size == 1) {
305+
functions.head match {
306+
case trunc: DateTrunc =>
307+
trunc.unit match {
308+
case TimeUnit.YEARS => Option(DateHistogramInterval.Year)
309+
case TimeUnit.QUARTERS => Option(DateHistogramInterval.Quarter)
310+
case TimeUnit.MONTHS => Option(DateHistogramInterval.Month)
311+
case TimeUnit.WEEKS => Option(DateHistogramInterval.Week)
312+
case TimeUnit.DAYS => Option(DateHistogramInterval.Day)
313+
case TimeUnit.HOURS => Option(DateHistogramInterval.Hour)
314+
case TimeUnit.MINUTES => Option(DateHistogramInterval.Minute)
315+
case TimeUnit.SECONDS => Option(DateHistogramInterval.Second)
316+
case _ => None
317+
}
318+
case _ => None
319+
}
320+
} else {
321+
None
322+
}
323+
bucketsDirection.get(bucket.identifier.identifierName) match {
324+
case Some(direction) =>
325+
DateHistogramAggregation(bucket.name, calendarInterval = interval)
326+
.field(currentBucketPath)
327+
.minDocCount(minDocCount)
328+
.order(direction match {
329+
case Asc => HistogramOrder("_key", asc = true)
330+
case _ => HistogramOrder("_key", asc = false)
331+
})
332+
case _ =>
333+
DateHistogramAggregation(bucket.name, calendarInterval = interval)
334+
.field(currentBucketPath)
335+
.minDocCount(minDocCount)
336+
}
337+
case _ =>
338+
bucketsDirection.get(bucket.identifier.identifierName) match {
339+
case Some(direction) =>
340+
termsAgg(bucket.name, currentBucketPath)
341+
.minDocCount(minDocCount)
342+
.order(Seq(direction match {
343+
case Asc => TermsOrder("_key", asc = true)
344+
case _ => TermsOrder("_key", asc = false)
345+
}))
346+
case _ =>
347+
termsAgg(bucket.name, currentBucketPath)
348+
.minDocCount(minDocCount)
349+
}
290350
}
291351
}
292-
bucket.size.foreach(s => agg = agg.size(s))
293-
having match {
294-
case Some(criteria) =>
295-
criteria.includes(bucket, not = false, BucketIncludesExcludes()) match {
296-
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
297-
agg = agg.includeRegex(regex)
298-
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
299-
agg = agg.includeExactValues(values.toArray)
300-
case _ =>
301-
}
302-
criteria.excludes(bucket, not = false, BucketIncludesExcludes()) match {
303-
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
304-
agg = agg.excludeRegex(regex)
305-
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
306-
agg = agg.excludeExactValues(values.toArray)
352+
agg match {
353+
case termsAgg: TermsAggregation =>
354+
bucket.size.foreach(s => agg = termsAgg.size(s))
355+
having match {
356+
case Some(criteria) =>
357+
criteria.includes(bucket, not = false, BucketIncludesExcludes()) match {
358+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
359+
agg = termsAgg.includeRegex(regex)
360+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
361+
agg = termsAgg.includeExactValues(values.toArray)
362+
case _ =>
363+
}
364+
criteria.excludes(bucket, not = false, BucketIncludesExcludes()) match {
365+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
366+
agg = termsAgg.excludeRegex(regex)
367+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
368+
agg = termsAgg.excludeExactValues(values.toArray)
369+
case _ =>
370+
}
307371
case _ =>
308372
}
309373
case _ =>
310374
}
311375
current match {
312-
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
376+
case Some(subAgg) =>
377+
agg match {
378+
case termsAgg: TermsAggregation =>
379+
agg = termsAgg.subaggs(Seq(subAgg))
380+
case dateHistogramAgg: DateHistogramAggregation =>
381+
agg = dateHistogramAgg.subaggs(Seq(subAgg))
382+
case _ =>
383+
}
384+
Some(agg)
313385
case None =>
314-
val aggregationsWithOrder: Seq[TermsOrder] = aggregationsDirection.toSeq.map { kv =>
315-
kv._2 match {
316-
case Asc => TermsOrder(kv._1, asc = true)
317-
case _ => TermsOrder(kv._1, asc = false)
386+
val subaggs =
387+
having match {
388+
case Some(criteria) =>
389+
val script = metricSelectorForBucket(
390+
criteria,
391+
nested,
392+
allElasticAggregations
393+
)
394+
395+
if (script.nonEmpty) {
396+
val bucketSelector =
397+
bucketSelectorAggregation(
398+
"having_filter",
399+
Script(script),
400+
extractMetricsPathForBucket(
401+
criteria,
402+
nested,
403+
allElasticAggregations
404+
)
405+
)
406+
aggregations :+ bucketSelector
407+
} else {
408+
aggregations
409+
}
410+
case None =>
411+
aggregations
318412
}
319-
}
320-
val withAggregationOrders =
321-
if (aggregationsWithOrder.nonEmpty)
322-
agg.order(aggregationsWithOrder)
323-
else
324-
agg
325-
val withHaving = having match {
326-
case Some(criteria) =>
327-
val script = metricSelectorForBucket(
328-
criteria,
329-
nested,
330-
allElasticAggregations
331-
)
332413

333-
if (script.nonEmpty) {
334-
val bucketSelector =
335-
bucketSelectorAggregation(
336-
"having_filter",
337-
Script(script),
338-
extractMetricsPathForBucket(
339-
criteria,
340-
nested,
341-
allElasticAggregations
342-
)
343-
)
344-
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
345-
} else {
346-
withAggregationOrders.copy(subaggs = aggregations)
414+
agg match {
415+
case termsAgg: TermsAggregation =>
416+
val aggregationsWithOrder: Seq[TermsOrder] = aggregationsDirection.toSeq.map { kv =>
417+
kv._2 match {
418+
case Asc => TermsOrder(kv._1, asc = true)
419+
case _ => TermsOrder(kv._1, asc = false)
420+
}
347421
}
348-
case None => withAggregationOrders.copy(subaggs = aggregations)
422+
if (aggregationsWithOrder.nonEmpty)
423+
agg = termsAgg.order(aggregationsWithOrder).copy(subaggs = subaggs)
424+
else
425+
agg = termsAgg.copy(subaggs = subaggs)
426+
case dateHistogramAggregation: DateHistogramAggregation =>
427+
agg = dateHistogramAggregation.copy(subaggs = subaggs)
349428
}
350-
Some(withHaving)
429+
Some(agg)
351430
}
352431
}
353432
}

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616

1717
package app.softnetwork.elastic.sql
1818

19-
import app.softnetwork.elastic.sql.`type`.{SQLBigInt, SQLDouble, SQLTemporal, SQLVarchar}
19+
import app.softnetwork.elastic.sql.`type`.{
20+
SQLBigInt,
21+
SQLDouble,
22+
SQLNumeric,
23+
SQLTemporal,
24+
SQLVarchar
25+
}
2026
import app.softnetwork.elastic.sql.function.aggregate.COUNT
2127
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
2228
import app.softnetwork.elastic.sql.operator._
@@ -34,7 +40,7 @@ import com.sksamuel.elastic4s.requests.searches.aggs.{
3440
}
3541
import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery
3642
import com.sksamuel.elastic4s.requests.searches.queries.{InnerHit, Query}
37-
import com.sksamuel.elastic4s.requests.searches.sort.FieldSort
43+
import com.sksamuel.elastic4s.requests.searches.sort.{FieldSort, ScriptSort, ScriptSortType}
3844
import com.sksamuel.elastic4s.requests.searches.{
3945
MultiSearchRequest,
4046
SearchBodyBuilderFn,
@@ -457,7 +463,7 @@ package object bridge {
457463
_search
458464
}
459465

460-
_search = scriptFields.filterNot(_.aggregation) match {
466+
_search = scriptFields.filterNot(_.isAggregation) match {
461467
case Nil => _search
462468
case _ =>
463469
_search scriptfields scriptFields.map { field =>
@@ -478,17 +484,55 @@ package object bridge {
478484

479485
_search = orderBy match {
480486
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
481-
_search sortBy o.sorts.map(sort =>
482-
sort.order match {
483-
case Some(Desc) => FieldSort(sort.field).desc()
484-
case _ => FieldSort(sort.field).asc()
487+
_search sortBy o.sorts.map { sort =>
488+
if (sort.isScriptSort) {
489+
val context = PainlessContext()
490+
val painless = sort.field.painless(Some(context))
491+
val painlessScript = s"$context$painless"
492+
val script =
493+
sort.out match {
494+
case _: SQLTemporal if !painless.endsWith("toEpochMilli()") =>
495+
val parts = painlessScript.split(";").toSeq
496+
if (parts.size > 1) {
497+
val lastPart = parts.last.trim.stripPrefix("return ")
498+
if (lastPart.split(" ").toSeq.size == 1) {
499+
val newLastPart =
500+
s"""($lastPart != null) ? $lastPart.toInstant().toEpochMilli() : null"""
501+
s"${parts.dropRight(1).mkString(";")}; return $newLastPart"
502+
} else {
503+
painlessScript
504+
}
505+
} else {
506+
s"$painlessScript.toInstant().toEpochMilli()"
507+
}
508+
case _ => painlessScript
509+
}
510+
val scriptSort =
511+
ScriptSort(
512+
script = Script(script = script)
513+
.lang("painless")
514+
.scriptType(Source),
515+
scriptSortType = sort.field.out match {
516+
case _: SQLTemporal | _: SQLNumeric => ScriptSortType.Number
517+
case _ => ScriptSortType.String
518+
}
519+
)
520+
sort.order match {
521+
case Some(Desc) => scriptSort.desc()
522+
case _ => scriptSort.asc()
523+
}
524+
} else {
525+
sort.order match {
526+
case Some(Desc) => FieldSort(sort.field.aliasOrName).desc()
527+
case _ => FieldSort(sort.field.aliasOrName).asc()
528+
}
485529
}
486-
)
530+
}
487531
case _ => _search
488532
}
489533

490534
if (allAggregations.nonEmpty && fields.isEmpty) {
491-
_search size 0
535+
_search size 0 fetchSource false
492536
} else {
493537
limit match {
494538
case Some(l) => _search limit l.limit from l.offset.map(_.offset).getOrElse(0)
@@ -512,7 +556,7 @@ package object bridge {
512556

513557
implicit def expressionToQuery(expression: GenericExpression): Query = {
514558
import expression._
515-
if (aggregation)
559+
if (isAggregation)
516560
return matchAllQuery()
517561
if (
518562
identifier.functions.nonEmpty && (identifier.functions.size > 1 || (identifier.functions.head match {

0 commit comments

Comments
 (0)