Skip to content

Commit e73b7d9

Browse files
committed
update elastic conversion in order to load aggs top hits
1 parent 393cc62 commit e73b7d9

File tree

1 file changed

+127
-70
lines changed

1 file changed

+127
-70
lines changed

core/src/main/scala/app/softnetwork/elastic/client/ElasticConversion.scala

Lines changed: 127 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,17 @@ trait ElasticConversion {
155155
parseAggregations(aggs, Map.empty, fieldAliases, aggregations)
156156

157157
case (Some(hits), Some(aggs)) if hits.nonEmpty =>
158-
// Case 4 : Hits + global aggregations
158+
// Case 4 : Hits + global aggregations + top_hits aggregations
159159
val globalMetrics = extractGlobalMetrics(aggs)
160+
val allTopHits = extractAggregationValues(
161+
extractAllTopHits(aggs, fieldAliases, aggregations),
162+
aggregations
163+
)
160164
hits.map { hit =>
161165
val source = extractSource(hit, fieldAliases)
162166
val metadata = extractHitMetadata(hit)
163167
val innerHits = extractInnerHits(hit, fieldAliases)
164-
globalMetrics ++ source ++ metadata ++ innerHits
168+
globalMetrics ++ allTopHits ++ source ++ metadata ++ innerHits
165169
}
166170

167171
case _ =>
@@ -352,69 +356,18 @@ trait ElasticConversion {
352356
} else if (bucketAggs.isEmpty) {
353357
// No buckets : it is a leaf aggregation (metrics or top_hits)
354358
val metrics = extractMetrics(aggsNode)
355-
val allTopHits = extractAllTopHits(aggsNode)
359+
val allTopHits = extractAllTopHits(aggsNode, fieldAliases, aggregations)
356360

357361
if (allTopHits.nonEmpty) {
358-
// Process each top_hits aggregation with their names
359-
val topHitsData = allTopHits.map { case (topHitName, hits) =>
360-
// Determine if it is a multivalued aggregation (array_agg, ...)
361-
val hasMultipleValues = aggregations.get(topHitName) match {
362-
case Some(agg) => agg.multivalued
363-
case None =>
364-
// Fallback on naming convention if aggregation is not found
365-
!topHitName.toLowerCase.matches("(first|last)_.*")
366-
}
367-
368-
val processedHits = hits.map { hit =>
369-
val source = extractSource(hit, fieldAliases)
370-
if (hasMultipleValues) {
371-
source.size match {
372-
case 0 => null
373-
case 1 =>
374-
// If only one field in source and multivalued, return the value directly
375-
val value = source.head._2
376-
value match {
377-
case list: List[_] => list
378-
case map: Map[_, _] => map
379-
case other => other
380-
}
381-
case _ =>
382-
// Multiple fields: return as object
383-
val metadata = extractHitMetadata(hit)
384-
val innerHits = extractInnerHits(hit, fieldAliases)
385-
source ++ metadata ++ innerHits
386-
}
387-
} else {
388-
val metadata = extractHitMetadata(hit)
389-
val innerHits = extractInnerHits(hit, fieldAliases)
390-
source ++ metadata ++ innerHits
391-
}
392-
}
393-
394-
// If multipleValues = true OR more than one hit, return a list
395-
// If multipleValues = false AND only one hit, return an object
396-
topHitName -> {
397-
if (!hasMultipleValues && processedHits.size == 1)
398-
processedHits.head
399-
else {
400-
if (aggregations.get(topHitName).exists(_.distinct))
401-
processedHits.distinct
402-
else
403-
processedHits
404-
}
405-
}
406-
}
407-
408-
Seq(parentContext ++ metrics ++ topHitsData)
409-
362+
Seq(parentContext ++ metrics ++ allTopHits)
410363
} else if (metrics.nonEmpty || parentContext.nonEmpty) {
411364
Seq(parentContext ++ metrics)
412365
} else {
413366
Seq.empty
414367
}
415368
} else {
416369
// Handle each aggregation with buckets
417-
bucketAggs.flatMap { case (aggName, buckets, aggValue) =>
370+
bucketAggs.flatMap { case (aggName, buckets, _) =>
418371
buckets.flatMap { bucket =>
419372
val bucketKey = extractBucketKey(bucket)
420373
val docCount = Option(bucket.get("doc_count"))
@@ -572,23 +525,80 @@ trait ElasticConversion {
572525
}
573526

574527
/** Extract all top_hits aggregations with their names and hits */
575-
def extractAllTopHits(aggsNode: JsonNode): Map[String, Seq[JsonNode]] = {
528+
def extractAllTopHits(
529+
aggsNode: JsonNode,
530+
fieldAliases: Map[String, String],
531+
aggregations: Map[String, ClientAggregation]
532+
): Map[String, Any] = {
576533
if (!aggsNode.isObject) return Map.empty
577-
aggsNode
578-
.properties()
579-
.asScala
580-
.collect {
581-
case entry if entry.getValue.has("hits") =>
582-
val normalizedKey = normalizeAggregationKey(entry.getKey)
583-
val hitsNode = entry.getValue.path("hits").path("hits")
584-
val hits = if (hitsNode.isArray) {
585-
hitsNode.elements().asScala.toSeq
586-
} else {
587-
Seq.empty
534+
val allTopHits =
535+
aggsNode
536+
.properties()
537+
.asScala
538+
.collect {
539+
case entry if entry.getValue.has("hits") =>
540+
val normalizedKey = normalizeAggregationKey(entry.getKey)
541+
val hitsNode = entry.getValue.path("hits").path("hits")
542+
val hits = if (hitsNode.isArray) {
543+
hitsNode.elements().asScala.toSeq
544+
} else {
545+
Seq.empty
546+
}
547+
normalizedKey -> hits
548+
}
549+
.toMap
550+
551+
// Process each top_hits aggregation with their names
552+
val row = allTopHits.map { case (topHitName, hits) =>
553+
// Determine if it is a multivalued aggregation (array_agg, ...)
554+
val hasMultipleValues = aggregations.get(topHitName) match {
555+
case Some(agg) => agg.multivalued
556+
case None =>
557+
// Fallback on naming convention if aggregation is not found
558+
!topHitName.toLowerCase.matches("(first|last)_.*")
559+
}
560+
561+
val processedHits = hits.map { hit =>
562+
val source = extractSource(hit, fieldAliases)
563+
if (hasMultipleValues) {
564+
source.size match {
565+
case 0 => null
566+
case 1 =>
567+
// If only one field in source and multivalued, return the value directly
568+
val value = source.head._2
569+
value match {
570+
case list: List[_] => list
571+
case map: Map[_, _] => map
572+
case other => other
573+
}
574+
case _ =>
575+
// Multiple fields: return as object
576+
val metadata = extractHitMetadata(hit)
577+
val innerHits = extractInnerHits(hit, fieldAliases)
578+
source ++ metadata ++ innerHits
588579
}
589-
normalizedKey -> hits
580+
} else {
581+
val metadata = extractHitMetadata(hit)
582+
val innerHits = extractInnerHits(hit, fieldAliases)
583+
source ++ metadata ++ innerHits
584+
}
590585
}
591-
.toMap
586+
587+
// If multipleValues = true OR more than one hit, return a list
588+
// If multipleValues = false AND only one hit, return an object
589+
topHitName -> {
590+
if (!hasMultipleValues && processedHits.size == 1)
591+
processedHits.head
592+
else {
593+
if (aggregations.get(topHitName).exists(_.distinct))
594+
processedHits.distinct
595+
else
596+
processedHits
597+
}
598+
}
599+
}
600+
601+
row
592602
}
593603

594604
/** Extract global metrics from aggregations (for hits + aggs case)
@@ -622,6 +632,53 @@ trait ElasticConversion {
622632
.toMap
623633
}
624634

635+
def extractAggregationValues(
636+
row: Map[String, Any],
637+
aggregations: Map[String, ClientAggregation]
638+
): Map[String, Any] = {
639+
val values = aggregations
640+
.map { wf =>
641+
val fieldName = wf._1
642+
643+
val aggType = wf._2.aggType
644+
645+
val sourceField = wf._2.sourceField
646+
647+
// Get value from row (already processed by ElasticConversion)
648+
val value = row.get(fieldName).orElse {
649+
None
650+
}
651+
652+
val validatedValue =
653+
value match {
654+
case Some(m: Map[String, Any]) =>
655+
m.get(sourceField) match {
656+
case Some(v) =>
657+
aggType match {
658+
case AggregationType.ArrayAgg =>
659+
v match {
660+
case l: List[_] =>
661+
Some(l)
662+
case other =>
663+
Some(List(other)) // Wrap into a List
664+
}
665+
case _ => Some(v)
666+
}
667+
case None =>
668+
None
669+
}
670+
case other =>
671+
other
672+
}
673+
674+
fieldName -> validatedValue
675+
}
676+
.collect { case (name, Some(value)) =>
677+
name -> value
678+
}
679+
values
680+
}
681+
625682
/** Convert recursively a JsonNode to Map
626683
*/
627684
def jsonNodeToMap(node: JsonNode, fieldAliases: Map[String, String]): Map[String, Any] = {

0 commit comments

Comments
 (0)