Skip to content

Commit 081027a

Browse files
committed
add flag indicating whether or not an aggregate is a window function with partitioning
1 parent 6555fef commit 081027a

File tree

7 files changed

+32
-72
lines changed

7 files changed

+32
-72
lines changed

core/src/main/scala/app/softnetwork/elastic/client/ScrollApi.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,7 @@ trait ScrollApi extends ElasticClientHelpers {
122122
)(implicit system: ActorSystem): Source[(Map[String, Any], ScrollMetrics), NotUsed] = {
123123
sql.request match {
124124
case Some(Left(single)) =>
125-
if (
126-
single.windowFunctions.nonEmpty && (single.fields.nonEmpty || single.windowFunctions
127-
.flatMap(_.fields)
128-
.distinct
129-
.size > 1)
130-
)
125+
if (single.windowFunctions.nonEmpty)
131126
return scrollWithWindowEnrichment(sql, single, config)
132127

133128
val sqlRequest = single.copy(score = sql.score)

core/src/main/scala/app/softnetwork/elastic/client/SearchApi.scala

Lines changed: 4 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
6868
collection.immutable.Seq(single.sources: _*),
6969
sql = Some(sql.query)
7070
)
71-
if (
72-
single.windowFunctions.nonEmpty && (single.fields.nonEmpty || single.windowFunctions
73-
.flatMap(_.fields)
74-
.distinct
75-
.size > 1)
76-
)
71+
if (single.windowBuckets.nonEmpty)
7772
searchWithWindowEnrichment(sql, single)
7873
else
7974
singleSearch(elasticQuery, single.fieldAliases, single.sqlAggregations)
@@ -829,7 +824,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
829824
s"✅ Successfully executed search with inner hits in indices '${elasticQuery.indices.mkString(",")}'"
830825
)
831826
ElasticResult.attempt {
832-
new JsonParser().parse(response).getAsJsonObject
827+
JsonParser.parseString(response).getAsJsonObject
833828
} match {
834829
case ElasticFailure(error) =>
835830
logger.error(
@@ -918,7 +913,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
918913
s"✅ Successfully executed multi-search inner hits with ${elasticQueries.queries.size} queries"
919914
)
920915
ElasticResult.attempt {
921-
new JsonParser().parse(response).getAsJsonObject
916+
JsonParser.parseString(response).getAsJsonObject
922917
} match {
923918
case ElasticFailure(error) =>
924919
logger.error(
@@ -1287,51 +1282,7 @@ trait SearchApi extends ElasticConversion with ElasticClientHelpers {
12871282
aggregations: Map[String, ClientAggregation]
12881283
): WindowValues = {
12891284

1290-
val values = aggregations
1291-
.filter(_._2.window)
1292-
.map { wf =>
1293-
val fieldName = wf._1
1294-
1295-
val aggType = wf._2.aggType
1296-
1297-
val sourceField = wf._2.sourceField
1298-
1299-
// Get value from row (already processed by ElasticConversion)
1300-
val value = row.get(fieldName).orElse {
1301-
logger.warn(s"⚠️ Window function '$fieldName' not found in aggregation result")
1302-
None
1303-
}
1304-
1305-
val validatedValue =
1306-
value match {
1307-
case Some(m: Map[String, Any]) =>
1308-
m.get(sourceField) match {
1309-
case Some(v) =>
1310-
aggType match {
1311-
case AggregationType.ArrayAgg =>
1312-
v match {
1313-
case l: List[_] =>
1314-
Some(l)
1315-
case other =>
1316-
logger.warn(
1317-
s"⚠️ Expected List for ARRAY_AGG '$fieldName', got ${other.getClass.getSimpleName}"
1318-
)
1319-
Some(List(other)) // Wrap into a List
1320-
}
1321-
case _ => Some(v)
1322-
}
1323-
case None =>
1324-
None
1325-
}
1326-
case other =>
1327-
other
1328-
}
1329-
1330-
fieldName -> validatedValue
1331-
}
1332-
.collect { case (name, Some(value)) =>
1333-
name -> value
1334-
}
1285+
val values = extractAggregationValues(row, aggregations)
13351286

13361287
WindowValues(values)
13371288
}

core/src/main/scala/app/softnetwork/elastic/client/package.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,17 @@ package object client extends SerializationApi {
137137
* @param distinct
138138
* - when the aggregation is multivalued define if its values should be returned distinct or
139139
* not
140+
* @param sourceField
141+
* - the source field of the aggregation
142+
* @param windowing
143+
* - whether the aggregation is a window function with partitioning
140144
*/
141145
case class ClientAggregation(
142146
aggName: String,
143147
aggType: AggregationType.AggregationType,
144148
distinct: Boolean,
145149
sourceField: String,
146-
window: Boolean
150+
windowing: Boolean
147151
) {
148152
def multivalued: Boolean = aggType == AggregationType.ArrayAgg
149153
def singleValued: Boolean = !multivalued
@@ -166,7 +170,7 @@ package object client extends SerializationApi {
166170
aggType,
167171
agg.distinct,
168172
agg.sourceField,
169-
agg.aggType.isInstanceOf[WindowFunction]
173+
agg.aggType.isWindowing
170174
)
171175
}
172176
}

core/src/test/scala/app/softnetwork/elastic/client/ElasticConversionSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class ElasticConversionSpec extends AnyFlatSpec with Matchers with ElasticConver
190190
aggType = AggregationType.ArrayAgg,
191191
distinct = false,
192192
"name",
193-
window = true
193+
windowing = true
194194
)
195195
)
196196
) match {
@@ -643,7 +643,7 @@ class ElasticConversionSpec extends AnyFlatSpec with Matchers with ElasticConver
643643
aggType = AggregationType.ArrayAgg,
644644
distinct = false,
645645
"name",
646-
window = true
646+
windowing = true
647647
)
648648
)
649649
) match {

sql/src/main/scala/app/softnetwork/elastic/sql/function/aggregate/package.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ package object aggregate {
2929
override def hasAggregation: Boolean = true
3030

3131
def isBucketScript: Boolean = false
32+
33+
/** Indicates whether this aggregation is a windowing function with partitioning or not
34+
*/
35+
def isWindowing: Boolean = false
3236
}
3337

3438
case object COUNT extends Expr("COUNT") with AggregateFunction
@@ -97,6 +101,8 @@ package object aggregate {
97101
def window: Window
98102
def limit: Option[Limit]
99103

104+
override def isWindowing: Boolean = buckets.nonEmpty
105+
100106
lazy val buckets: Seq[Bucket] = partitionBy.map(identifier => Bucket(identifier, None))
101107

102108
lazy val bucketNames: Map[String, Bucket] = buckets.map { b =>
@@ -120,12 +126,16 @@ package object aggregate {
120126
val updated = this
121127
.withPartitionBy(partitionBy = partitionBy.map(_.update(request)))
122128
updated.withFields(
123-
fields = request.select.fields
124-
.filterNot(field =>
125-
field.isAggregation || request.bucketNames.keys.toSeq
126-
.contains(field.identifier.identifierName)
127-
)
128-
.filterNot(f => request.excludes.contains(f.sourceField))
129+
fields = if (isWindowing) {
130+
request.select.fields
131+
.filterNot(field =>
132+
field.isAggregation || request.bucketNames.keys.toSeq
133+
.contains(field.identifier.identifierName)
134+
)
135+
.filterNot(f => request.excludes.contains(f.sourceField))
136+
} else {
137+
updated.fields
138+
}
129139
)
130140
}
131141
}

sql/src/main/scala/app/softnetwork/elastic/sql/query/SQLSearchRequest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ case class SQLSearchRequest(
122122
}
123123

124124
lazy val fields: Seq[String] = {
125-
if (aggregates.isEmpty && buckets.isEmpty)
125+
if (buckets.isEmpty)
126126
select.fields
127127
.filterNot(_.isScriptField)
128128
.filterNot(_.nested)
@@ -140,7 +140,7 @@ case class SQLSearchRequest(
140140
lazy val aggregates: Seq[Field] =
141141
select.fields
142142
.filter(f => f.isAggregation || f.isBucketScript)
143-
.filterNot(_.windows.isDefined) ++ windowFields
143+
.filterNot(_.isWindow) ++ windowFields
144144

145145
lazy val sqlAggregations: Map[String, SQLAggregation] =
146146
aggregates.flatMap(f => SQLAggregation.fromField(f, this)).map(a => a.aggName -> a).toMap

sql/src/main/scala/app/softnetwork/elastic/sql/query/Select.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ case class Field(
7373
lazy val windows: Option[WindowFunction] =
7474
functions.collectFirst { case th: WindowFunction => th }
7575

76-
def isWindow: Boolean = windows.isDefined
76+
def isWindow: Boolean = windows.nonEmpty //.exists(_.partitionBy.nonEmpty)
7777

7878
def update(request: SQLSearchRequest): Field = {
7979
windows match {

0 commit comments

Comments
 (0)