Skip to content

Commit 0ca06f9

Browse files
committed
add support for multiple bucket trees (for windowing) and add aggregations to the corresponding bucket when required
1 parent 8dc0082 commit 0ca06f9

File tree

12 files changed

+780
-454
lines changed

12 files changed

+780
-454
lines changed

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 203 additions & 181 deletions
Large diffs are not rendered by default.

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ package object bridge {
151151
): Seq[AbstractAggregation] = {
152152
val notNestedAggregations = aggregations.filterNot(_.nested)
153153

154-
val notNestedBuckets = request.buckets.filterNot(_.nested)
154+
val notNestedBuckets = request.bucketTree.filterNot(_.bucket.nested)
155155

156156
val rootAggregations = notNestedAggregations match {
157157
case Nil =>
@@ -164,8 +164,8 @@ package object bridge {
164164
None,
165165
aggregations
166166
) match {
167-
case Some(b) => Seq(b)
168-
case _ => Seq.empty
167+
case Nil => Seq.empty
168+
case aggs => aggs
169169
}
170170
buckets
171171
case aggs =>
@@ -179,14 +179,18 @@ package object bridge {
179179
val buckets = ElasticAggregation.buildBuckets(
180180
notNestedBuckets,
181181
request.sorts -- directions.keys,
182-
aggregations,
182+
aggs,
183183
directions,
184184
request.having.flatMap(_.criteria),
185185
None,
186186
aggs
187187
) match {
188-
case Some(b) => Seq(b)
189-
case _ => aggregations
188+
case Nil => aggs.map(_.agg)
189+
case aggs =>
190+
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
191+
notNestedAggregations.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
192+
else
193+
aggs
190194
}
191195
buckets
192196
}
@@ -210,12 +214,14 @@ package object bridge {
210214

211215
// Group nested buckets by their nested path
212216
val nestedGroupedBuckets =
213-
request.buckets
214-
.filter(_.nested)
215-
.groupBy(
216-
_.nestedBucket.getOrElse(
217-
throw new IllegalArgumentException(
218-
"Nested bucket must have a nested element"
217+
request.bucketTree
218+
.filter(_.bucket.nested)
219+
.map(tree =>
220+
tree.groupBy(
221+
_.bucket.nestedBucket.getOrElse(
222+
throw new IllegalArgumentException(
223+
"Nested bucket must have a nested element"
224+
)
219225
)
220226
)
221227
)
@@ -235,17 +241,16 @@ package object bridge {
235241

236242
// Get the buckets for this nested element
237243
val nestedBuckets =
238-
nestedGroupedBuckets.getOrElse(n.innerHitsName, Seq.empty)
244+
nestedGroupedBuckets.map(_.getOrElse(n.innerHitsName, Seq.empty))
239245

240246
val notRelatedAggregationsToBuckets = elasticAggregations
241247
.filterNot { ea =>
242-
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
248+
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
243249
}
244-
.map(_.agg)
245250

246251
val relatedAggregationsToBuckets = elasticAggregations
247252
.filter { ea =>
248-
nestedBuckets.exists(nb => nb.identifier.path == ea.sourceField)
253+
nestedBuckets.flatten.exists(nb => nb.bucket.identifier.path == ea.sourceField)
249254
}
250255
.map(_.agg)
251256

@@ -273,8 +278,12 @@ package object bridge {
273278
Some(n),
274279
aggregations
275280
) match {
276-
case Some(b) => Seq(b)
277-
case _ => notRelatedAggregationsToBuckets
281+
case Nil => notRelatedAggregationsToBuckets.map(_.agg)
282+
case aggs =>
283+
if (request.groupBy.isEmpty && request.windowFunctions.exists(_.isWindowing))
284+
notRelatedAggregationsToBuckets.filter(_.bucketPath.isEmpty).map(_.agg) ++ aggs
285+
else
286+
aggs
278287
}
279288

280289
val children = n.children

core/src/main/scala/app/softnetwork/elastic/client/SearchApi.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
6868
collection.immutable.Seq(single.sources: _*),
6969
sql = Some(sql.query)
7070
)
71-
if (single.windowBuckets.nonEmpty)
71+
if (single.windowFunctions.exists(_.isWindowing))
7272
searchWithWindowEnrichment(sql, single)
7373
else
7474
singleSearch(elasticQuery, single.fieldAliases, single.sqlAggregations)
@@ -131,7 +131,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
131131
val query = elasticQuery.query
132132
val indices = elasticQuery.indices.mkString(",")
133133

134-
logger.debug(
134+
logger.info(
135135
s"🔍 Searching with query \n$elasticQuery\nin indices '$indices'"
136136
)
137137

@@ -1168,7 +1168,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
11681168
request
11691169
.copy(
11701170
select = request.select.copy(fields = request.windowFields),
1171-
groupBy = request.groupBy.map(_.copy(buckets = request.windowBuckets)),
1171+
groupBy = None, //request.groupBy.map(_.copy(buckets = request.windowBuckets)),
11721172
orderBy = None, // Not needed for aggregations
11731173
limit = None // Need all buckets
11741174
)
@@ -1236,7 +1236,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
12361236
): SQLSearchRequest = {
12371237

12381238
// Remove window function fields from SELECT
1239-
val baseFields = request.select.fields.filterNot(_.windows.nonEmpty)
1239+
val baseFields = request.select.fields.filterNot(_.identifier.hasWindow)
12401240

12411241
// Create modified request
12421242
val baseRequest = request

core/src/main/scala/app/softnetwork/elastic/client/package.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ package object client extends SerializationApi {
168168
aggType: AggregationType.AggregationType,
169169
distinct: Boolean,
170170
sourceField: String,
171-
windowing: Boolean
171+
windowing: Boolean,
172+
bucketPath: String
172173
) {
173174
def multivalued: Boolean = aggType == AggregationType.ArrayAgg
174175
def singleValued: Boolean = !multivalued
@@ -191,7 +192,8 @@ package object client extends SerializationApi {
191192
aggType,
192193
agg.distinct,
193194
agg.sourceField,
194-
agg.aggType.isWindowing
195+
agg.aggType.isWindowing,
196+
agg.bucketPath
195197
)
196198
}
197199
}

core/src/test/scala/app/softnetwork/elastic/client/ElasticConversionSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ class ElasticConversionSpec extends AnyFlatSpec with Matchers with ElasticConver
190190
aggType = AggregationType.ArrayAgg,
191191
distinct = false,
192192
"name",
193-
windowing = true
193+
windowing = true,
194+
""
194195
)
195196
)
196197
) match {
@@ -643,7 +644,8 @@ class ElasticConversionSpec extends AnyFlatSpec with Matchers with ElasticConver
643644
aggType = AggregationType.ArrayAgg,
644645
distinct = false,
645646
"name",
646-
windowing = true
647+
windowing = true,
648+
""
647649
)
648650
)
649651
) match {

0 commit comments

Comments
 (0)