Skip to content

Commit 4d49bad

Browse files
committed
add specifications for window functions
1 parent 1097401 commit 4d49bad

File tree

1 file changed

+117
-0
lines changed

1 file changed

+117
-0
lines changed

testkit/src/main/scala/app/softnetwork/elastic/client/WindowFunctionSpec.scala

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,4 +1547,121 @@ trait WindowFunctionSpec
15471547
}
15481548
}
15491549

1550+
"Search and Scroll APIs with distinct window partitions" should "maintain consistency between them" in {
1551+
// Search
1552+
val searchResults = client.searchAs[EmployeeDistinctPartitions]("""
1553+
SELECT
1554+
department,
1555+
location,
1556+
name,
1557+
salary,
1558+
hire_date,
1559+
FIRST_VALUE(salary) OVER (
1560+
PARTITION BY department, location
1561+
ORDER BY hire_date ASC
1562+
) AS first_in_dept_loc,
1563+
LAST_VALUE(salary) OVER (
1564+
PARTITION BY department, location
1565+
ORDER BY hire_date ASC
1566+
) AS last_in_dept_loc,
1567+
FIRST_VALUE(salary) OVER (
1568+
PARTITION BY department
1569+
ORDER BY hire_date ASC
1570+
) AS first_in_dept,
1571+
LAST_VALUE(salary) OVER (
1572+
PARTITION BY department
1573+
ORDER BY hire_date ASC
1574+
) AS last_in_dept
1575+
FROM emp
1576+
WHERE department IN ('Engineering', 'Sales')
1577+
LIMIT 20
1578+
""") match {
1579+
case ElasticSuccess(emps) => emps
1580+
case ElasticFailure(error) => fail(s"Search failed: ${error.message}")
1581+
}
1582+
1583+
// Scroll
1584+
val config = ScrollConfig(scrollSize = 4)
1585+
val futureScrollResults = client
1586+
.scrollAs[EmployeeDistinctPartitions](
1587+
"""
1588+
SELECT
1589+
department,
1590+
location,
1591+
name,
1592+
salary,
1593+
hire_date,
1594+
FIRST_VALUE(salary) OVER (
1595+
PARTITION BY department, location
1596+
ORDER BY hire_date ASC
1597+
) AS first_in_dept_loc,
1598+
LAST_VALUE(salary) OVER (
1599+
PARTITION BY department, location
1600+
ORDER BY hire_date ASC
1601+
) AS last_in_dept_loc,
1602+
FIRST_VALUE(salary) OVER (
1603+
PARTITION BY department
1604+
ORDER BY hire_date ASC
1605+
) AS first_in_dept,
1606+
LAST_VALUE(salary) OVER (
1607+
PARTITION BY department
1608+
ORDER BY hire_date ASC
1609+
) AS last_in_dept
1610+
FROM emp
1611+
WHERE department IN ('Engineering', 'Sales')
1612+
LIMIT 20
1613+
""",
1614+
config
1615+
)
1616+
.runWith(Sink.seq)
1617+
1618+
val scrollResults = Await.result(futureScrollResults, 30.seconds).map(_._1)
1619+
1620+
log.info(s"\n=== Comparing Search vs Scroll for distinct partitions ===")
1621+
log.info(s" Search: ${searchResults.size} results")
1622+
log.info(s" Scroll: ${scrollResults.size} results")
1623+
1624+
searchResults.size shouldBe scrollResults.size
1625+
1626+
// Compare Window 1: (dept, loc)
1627+
val searchByDeptLoc = searchResults.groupBy(e => (e.department, e.location))
1628+
val scrollByDeptLoc = scrollResults.groupBy(e => (e.department, e.location))
1629+
1630+
searchByDeptLoc.keys shouldBe scrollByDeptLoc.keys
1631+
1632+
log.info("\n--- Window 1: PARTITION BY (department, location) ---")
1633+
searchByDeptLoc.foreach { case (key @ (dept, loc), searchEmps) =>
1634+
val scrollEmps = scrollByDeptLoc(key)
1635+
1636+
val searchFirst = searchEmps.flatMap(_.first_in_dept_loc).distinct.head
1637+
val scrollFirst = scrollEmps.flatMap(_.first_in_dept_loc).distinct.head
1638+
val searchLast = searchEmps.flatMap(_.last_in_dept_loc).distinct.head
1639+
val scrollLast = scrollEmps.flatMap(_.last_in_dept_loc).distinct.head
1640+
1641+
searchFirst shouldBe scrollFirst
1642+
searchLast shouldBe scrollLast
1643+
1644+
log.info(s" ✓ ($dept, $loc): FIRST=$searchFirst, LAST=$searchLast (consistent)")
1645+
}
1646+
1647+
// Compare Window 2: (dept)
1648+
val searchByDept = searchResults.groupBy(_.department)
1649+
val scrollByDept = scrollResults.groupBy(_.department)
1650+
1651+
log.info("\n--- Window 2: PARTITION BY (department) ---")
1652+
searchByDept.foreach { case (dept, searchEmps) =>
1653+
val scrollEmps = scrollByDept(dept)
1654+
1655+
val searchFirst = searchEmps.flatMap(_.first_in_dept).distinct.head
1656+
val scrollFirst = scrollEmps.flatMap(_.first_in_dept).distinct.head
1657+
val searchLast = searchEmps.flatMap(_.last_in_dept).distinct.head
1658+
val scrollLast = scrollEmps.flatMap(_.last_in_dept).distinct.head
1659+
1660+
searchFirst shouldBe scrollFirst
1661+
searchLast shouldBe scrollLast
1662+
1663+
log.info(s"$dept: FIRST=$searchFirst, LAST=$searchLast (consistent)")
1664+
}
1665+
}
1666+
15501667
}

0 commit comments

Comments
 (0)