Skip to content

Commit 96d8c4e

Browse files
committed
add full support for window functions
1 parent 4b421ff commit 96d8c4e

File tree

11 files changed

+2036
-4
lines changed

11 files changed

+2036
-4
lines changed

core/src/main/scala/app/softnetwork/elastic/client/ScrollApi.scala

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package app.softnetwork.elastic.client
1919
import akka.NotUsed
2020
import akka.actor.ActorSystem
2121
import akka.stream.scaladsl.{Sink, Source}
22+
import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}
2223
import app.softnetwork.elastic.client.scroll.{
2324
ScrollConfig,
2425
ScrollMetrics,
@@ -28,11 +29,11 @@ import app.softnetwork.elastic.client.scroll.{
2829
UseSearchAfter
2930
}
3031
import app.softnetwork.elastic.sql.macros.SQLQueryMacros
31-
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLQuery}
32+
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLQuery, SQLSearchRequest}
3233
import org.json4s.{Formats, JNothing}
3334
import org.json4s.jackson.JsonMethods.parse
3435

35-
import scala.concurrent.{ExecutionContext, Promise}
36+
import scala.concurrent.{ExecutionContext, Future, Promise}
3637
import scala.language.experimental.macros
3738
import scala.util.{Failure, Success}
3839

@@ -121,6 +122,14 @@ trait ScrollApi extends ElasticClientHelpers {
121122
)(implicit system: ActorSystem): Source[(Map[String, Any], ScrollMetrics), NotUsed] = {
122123
sql.request match {
123124
case Some(Left(single)) =>
125+
if (
126+
single.windowFunctions.nonEmpty && (single.fields.nonEmpty || single.windowFunctions
127+
.flatMap(_.fields)
128+
.distinct
129+
.size > 1)
130+
)
131+
return scrollWithWindowEnrichment(sql, single, config)
132+
124133
val sqlRequest = single.copy(score = sql.score)
125134
val elasticQuery =
126135
ElasticQuery(sqlRequest, collection.immutable.Seq(sqlRequest.sources: _*))
@@ -365,4 +374,73 @@ trait ScrollApi extends ElasticClientHelpers {
365374
}
366375
}
367376

377+
// ========================================================================
378+
// WINDOW FUNCTION SEARCH
379+
// ========================================================================
380+
381+
/** Scroll with window function enrichment
382+
*/
383+
private def scrollWithWindowEnrichment(
384+
sql: SQLQuery,
385+
request: SQLSearchRequest,
386+
config: ScrollConfig
387+
)(implicit system: ActorSystem): Source[(Map[String, Any], ScrollMetrics), NotUsed] = {
388+
389+
implicit val ec: ExecutionContext = system.dispatcher
390+
391+
logger.info(s"🪟 Scrolling with ${request.windowFunctions.size} window functions")
392+
393+
// Execute window aggregations first
394+
val windowCacheFuture: Future[ElasticResult[WindowCache]] =
395+
Future(executeWindowAggregations(request))
396+
397+
// Create base query without window functions
398+
val baseQuery = createBaseQuery(sql, request)
399+
400+
// Stream and enrich
401+
Source
402+
.futureSource(
403+
windowCacheFuture.map {
404+
case ElasticSuccess(cache) =>
405+
scrollWithMetrics(
406+
ElasticQuery(
407+
baseQuery,
408+
collection.immutable.Seq(baseQuery.sources: _*)
409+
),
410+
baseQuery.fieldAliases,
411+
baseQuery.sqlAggregations,
412+
config,
413+
baseQuery.sorts.nonEmpty
414+
)
415+
.map { case (doc, metrics) =>
416+
val enrichedDoc = enrichDocumentWithWindowValues(doc, cache, request)
417+
(enrichedDoc, metrics)
418+
}
419+
420+
case ElasticFailure(error) =>
421+
logger.error(s"❌ Failed to compute window functions: ${error.message}")
422+
if (config.failOnWindowError.getOrElse(false)) {
423+
// Strict mode: propagate the error
424+
Source.failed(
425+
new RuntimeException(s"Window function computation failed: ${error.message}")
426+
)
427+
} else {
428+
// Fallback: return base results without enrichment
429+
logger.warn("⚠️ Falling back to base results without window enrichment")
430+
scrollWithMetrics(
431+
ElasticQuery(
432+
baseQuery,
433+
collection.immutable.Seq(baseQuery.sources: _*)
434+
),
435+
baseQuery.fieldAliases,
436+
baseQuery.sqlAggregations,
437+
config,
438+
baseQuery.sorts.nonEmpty
439+
)
440+
}
441+
}
442+
)
443+
.mapMaterializedValue(_ => NotUsed)
444+
}
445+
368446
}

0 commit comments

Comments
 (0)