From 13f7c443d2ff8472784b9c3698871778cc0c90fe Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 6 Nov 2025 10:42:20 +0100 Subject: [PATCH 01/11] Add exponential histogram merge aggregation --- ...xponentialHistogramAggregatorFunction.java | 149 ++++++++++ ...alHistogramAggregatorFunctionSupplier.java | 47 ++++ ...alHistogramGroupingAggregatorFunction.java | 254 ++++++++++++++++++ .../ExponentialHistogramStates.java | 155 +++++++++++ .../MergeExponentialHistogramAggregator.java | 58 ++++ .../aggregate/AggregateWritables.java | 3 +- .../expression/function/aggregate/Merge.java | 85 ++++++ .../xpack/esql/planner/AggregateMapper.java | 3 +- 8 files changed, 752 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java new file mode 100644 index 0000000000000..bc4ebd785bb0f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java @@ -0,0 +1,149 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramScratch; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * {@link AggregatorFunction} implementation for {@link MergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class MergeExponentialHistogramAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); + + private final DriverContext driverContext; + + private final ExponentialHistogramStates.SingleState state; + + private final List channels; + + public MergeExponentialHistogramAggregatorFunction(DriverContext driverContext, + List channels, ExponentialHistogramStates.SingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static MergeExponentialHistogramAggregatorFunction create(DriverContext driverContext, + List channels) { + return new MergeExponentialHistogramAggregatorFunction(driverContext, channels, MergeExponentialHistogramAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + addRawBlock(valueBlock, mask); + } + + private void addRawInputNotMasked(Page page) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + addRawBlock(valueBlock); + } + + private void addRawBlock(ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + int valueValueCount = valueBlock.getValueCount(p); + if (valueValueCount == 0) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueValueCount; + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + MergeExponentialHistogramAggregator.combine(state, valueValue); + } + } + } + + private void addRawBlock(ExponentialHistogramBlock valueBlock, BooleanVector mask) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + int valueValueCount = valueBlock.getValueCount(p); + if (valueValueCount == 0) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueValueCount; + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + MergeExponentialHistogramAggregator.combine(state, valueValue); + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + assert value.getPositionCount() == 1; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + MergeExponentialHistogramAggregator.combineIntermediate(state, value.getExponentialHistogram(value.getFirstValueIndex(0), valueScratch)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = MergeExponentialHistogramAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..ab544bfeb2bbe --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link MergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class MergeExponentialHistogramAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public MergeExponentialHistogramAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return MergeExponentialHistogramAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return MergeExponentialHistogramGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public MergeExponentialHistogramAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return MergeExponentialHistogramAggregatorFunction.create(driverContext, channels); + } + + @Override + public MergeExponentialHistogramGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return MergeExponentialHistogramGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "merge_exponential of histograms"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..176b68f47f1e2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java @@ -0,0 +1,254 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramScratch; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link MergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class MergeExponentialHistogramGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); + + private final ExponentialHistogramStates.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public MergeExponentialHistogramGroupingAggregatorFunction(List channels, + ExponentialHistogramStates.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static MergeExponentialHistogramGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new MergeExponentialHistogramGroupingAggregatorFunction(channels, MergeExponentialHistogramAggregator.initGrouping(driverContext.bigArrays(), driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + maybeEnableGroupIdTracking(seenGroupIds, valueBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, + ExponentialHistogramBlock valueBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = MergeExponentialHistogramAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java new file mode 100644 index 0000000000000..0b0cbb0add438 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; + +public final class ExponentialHistogramStates { + + // We currently use a hardcoded limit for the number of buckets, we might make this configurable / an aggregation parameter later + // The current default is what's also used by the OpenTelemetry SDKs + private static final int MAX_BUCKET_COUNT = 320; + + private record HistoBreaker(CircuitBreaker delegate) implements ExponentialHistogramCircuitBreaker { + @Override + public void adjustBreaker(long bytesAllocated) { + if (bytesAllocated < 0) { + delegate.addWithoutBreaking(bytesAllocated); + } else { + delegate.addEstimateBytesAndMaybeBreak(bytesAllocated, "ExponentialHistogram aggregation state"); + } + } + } + + private ExponentialHistogramStates() {} + + static final class SingleState implements AggregatorState { + + private final CircuitBreaker breaker; + // initialize lazily + private ExponentialHistogramMerger merger; + + SingleState(CircuitBreaker breaker) { + this.breaker = breaker; + } + + public void add(ExponentialHistogram histogram) { + if (histogram == null) { + return; + } + if (merger == null) { + merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, new HistoBreaker(breaker)); + } + merger.add(histogram); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + assert blocks.length >= offset + 1; + blocks[offset] = evaluateFinal(driverContext); + } + + @Override + public void close() { + Releasables.close(merger); + merger = null; + } + + public Block evaluateFinal(DriverContext driverContext) { + BlockFactory blockFactory = driverContext.blockFactory(); + if (merger == null) { + return blockFactory.newConstantNullBlock(1); + } else { + return blockFactory.newConstantExponentialHistogramBlock(merger.get(), 1); + } + } + } + + static final class GroupingState implements GroupingAggregatorState { + + private ObjectArray states; + private final HistoBreaker breaker; + private final BigArrays bigArrays; + + GroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.states = bigArrays.newObjectArray(1); + this.bigArrays = bigArrays; + this.breaker = new HistoBreaker(breaker); + } + + ExponentialHistogramMerger getOrNull(int position) { + if (position < states.size()) { + return states.get(position); + } else { + return null; + } + } + + public void add(int groupId, ExponentialHistogram histogram) { + if (histogram == null) { + return; + } + ensureCapacity(groupId); + var state = states.get(groupId); + if (state == null) { + state = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, breaker); + states.set(groupId, state); + } + state.add(histogram); + } + + private void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 1 : "blocks=" + blocks.length + ",offset=" + offset; + blocks[offset] = evaluateFinal(selected, driverContext); + } + + public Block evaluateFinal(IntVector selected, DriverContext driverContext) { + try (var builder = driverContext.blockFactory().newExponentialHistogramBlockBuilder(selected.getPositionCount());) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int groupId = selected.getInt(i); + ExponentialHistogramMerger state = getOrNull(groupId); + if (state != null) { + builder.append(state.get()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void close() { + for (int i = 0; i < states.size(); i++) { + Releasables.close(states.get(i)); + } + Releasables.close(states); + states = null; + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java new file mode 100644 index 0000000000000..81f5abfcba11b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +@Aggregator({ @IntermediateState(name = "value", type = "EXPONENTIAL_HISTOGRAM"), }) +@GroupingAggregator +public class MergeExponentialHistogramAggregator { + + public static ExponentialHistogramStates.SingleState initSingle(DriverContext driverContext) { + return new ExponentialHistogramStates.SingleState(driverContext.breaker()); + } + + public static void combine(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { + state.add(value); + } + + public static void combineIntermediate(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { + state.add(value); + } + + public static Block evaluateFinal(ExponentialHistogramStates.SingleState state, DriverContext driverContext) { + return state.evaluateFinal(driverContext); + } + + public static ExponentialHistogramStates.GroupingState initGrouping(BigArrays bigArrays, DriverContext driverContext) { + return new ExponentialHistogramStates.GroupingState(bigArrays, driverContext.breaker()); + } + + public static void combine(ExponentialHistogramStates.GroupingState current, int groupId, ExponentialHistogram value) { + current.add(groupId, value); + } + + public static void combineIntermediate(ExponentialHistogramStates.GroupingState state, int groupId, ExponentialHistogram value) { + state.add(groupId, value); + } + + public static Block evaluateFinal( + ExponentialHistogramStates.GroupingState state, + IntVector selected, + GroupingAggregatorEvaluationContext ctx + ) { + return state.evaluateFinal(selected, ctx.driverContext()); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 5b3bddd89d093..bb3b539c7ea53 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -51,7 +51,8 @@ public static List getNamedWriteables() { PresentOverTime.ENTRY, Absent.ENTRY, AbsentOverTime.ENTRY, - DimensionValues.ENTRY + DimensionValues.ENTRY, + Merge.ENTRY ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java new file mode 100644 index 0000000000000..f0cfbd1caeb51 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.MergeExponentialHistogramAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class Merge extends AggregateFunction implements ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Merge", Merge::new); + + @FunctionInfo(returnType = "exponential_histogram", type = FunctionType.AGGREGATE) + public Merge(Source source, @Param(name = "histogram", type = { "exponential_histogram" }) Expression field) { + this(source, field, Literal.TRUE); + } + + public Merge(Source source, Expression field, Expression filter) { + super(source, field, filter, NO_WINDOW, emptyList()); + } + + private Merge(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public DataType dataType() { + return DataType.EXPONENTIAL_HISTOGRAM; + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> dt == DataType.EXPONENTIAL_HISTOGRAM, sourceText(), DEFAULT, "exponential_histogram"); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, Merge::new, field(), filter()); + } + + @Override + public Merge replaceChildren(List newChildren) { + return new Merge(source(), newChildren.get(0), newChildren.get(1)); + } + + public Merge withFilter(Expression filter) { + return new Merge(source(), field(), filter); + } + + @Override + public final AggregatorFunctionSupplier supplier() { + DataType type = field().dataType(); + if (type == DataType.EXPONENTIAL_HISTOGRAM) { + return new MergeExponentialHistogramAggregatorFunctionSupplier(); + } + throw EsqlIllegalArgumentException.illegalDataType(type); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index 8d4e5ab0346dd..3981b71f316b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -112,7 +112,8 @@ private static DataType toDataType(ElementType elementType) { case LONG -> DataType.LONG; case DOUBLE -> DataType.DOUBLE; case DOC -> DataType.DOC_DATA_TYPE; - case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new EsqlIllegalArgumentException( + case EXPONENTIAL_HISTOGRAM -> DataType.EXPONENTIAL_HISTOGRAM; + case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new EsqlIllegalArgumentException( "unsupported agg type: " + elementType ); }; From e731a32a31fc114a67fc6821c68373d46e1ad37e Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 6 Nov 2025 13:18:36 +0100 Subject: [PATCH 02/11] Implemented unit tests --- .../ExponentialHistogramStates.java | 19 ++- .../MergeExponentialHistogramAggregator.java | 8 +- .../ExponentialHistogramBlockBuilder.java | 2 - .../compute/test/BlockTestUtils.java | 25 ++- .../xpack/esql/EsqlTestUtils.java | 25 ++- .../esql/WriteableExponentialHistogram.java | 2 +- .../function/AbstractAggregationTestCase.java | 1 + .../function/MultiRowTestCaseSupplier.java | 26 +++ .../function/aggregate/MergeErrorTests.java | 48 ++++++ .../aggregate/MergeSerializationTests.java | 24 +++ .../function/aggregate/MergeTests.java | 159 ++++++++++++++++++ 11 files changed, 312 insertions(+), 27 deletions(-) create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java index 0b0cbb0add438..08eed3e453e98 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java @@ -23,7 +23,7 @@ public final class ExponentialHistogramStates { // We currently use a hardcoded limit for the number of buckets, we might make this configurable / an aggregation parameter later // The current default is what's also used by the OpenTelemetry SDKs - private static final int MAX_BUCKET_COUNT = 320; + public static final int MAX_BUCKET_COUNT = 320; private record HistoBreaker(CircuitBreaker delegate) implements ExponentialHistogramCircuitBreaker { @Override @@ -48,14 +48,18 @@ static final class SingleState implements AggregatorState { this.breaker = breaker; } - public void add(ExponentialHistogram histogram) { + public void add(ExponentialHistogram histogram, boolean allowUpscale) { if (histogram == null) { return; } if (merger == null) { merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, new HistoBreaker(breaker)); } - merger.add(histogram); + if (allowUpscale) { + merger.add(histogram); + } else { + merger.addWithoutUpscaling(histogram); + } } @Override @@ -100,7 +104,7 @@ ExponentialHistogramMerger getOrNull(int position) { } } - public void add(int groupId, ExponentialHistogram histogram) { + public void add(int groupId, ExponentialHistogram histogram, boolean allowUpscale) { if (histogram == null) { return; } @@ -110,7 +114,11 @@ public void add(int groupId, ExponentialHistogram histogram) { state = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, breaker); states.set(groupId, state); } - state.add(histogram); + if (allowUpscale) { + state.add(histogram); + } else { + state.addWithoutUpscaling(histogram); + } } private void ensureCapacity(int groupId) { @@ -151,5 +159,6 @@ public void close() { public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java index 81f5abfcba11b..4796de9cf3a61 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java @@ -25,11 +25,11 @@ public static ExponentialHistogramStates.SingleState initSingle(DriverContext dr } public static void combine(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { - state.add(value); + state.add(value, true); } public static void combineIntermediate(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { - state.add(value); + state.add(value, false); } public static Block evaluateFinal(ExponentialHistogramStates.SingleState state, DriverContext driverContext) { @@ -41,11 +41,11 @@ public static ExponentialHistogramStates.GroupingState initGrouping(BigArrays bi } public static void combine(ExponentialHistogramStates.GroupingState current, int groupId, ExponentialHistogram value) { - current.add(groupId, value); + current.add(groupId, value, true); } public static void combineIntermediate(ExponentialHistogramStates.GroupingState state, int groupId, ExponentialHistogram value) { - state.add(groupId, value); + state.add(groupId, value, false); } public static Block evaluateFinal( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java index f8aa1a708a3f9..7eb0ce01bf59c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java @@ -103,8 +103,6 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) { // We should add a dedicated encoding when building a block from computed histograms which do not originate from doc values // That encoding should be optimized for speed and support storing the zero threshold as (scale, index) pair ZeroBucket zeroBucket = histogram.zeroBucket(); - assert zeroBucket.compareZeroThreshold(ZeroBucket.minimalEmpty()) == 0 || zeroBucket.isIndexBased() == false - : "Current encoding only supports double-based zero thresholds"; BytesStreamOutput encodedBytes = new BytesStreamOutput(); try { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 53a89c642a919..0084b2c30c87d 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -29,8 +29,10 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; import org.hamcrest.Matcher; import java.util.ArrayList; @@ -38,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.DoubleStream; import java.util.stream.IntStream; import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; @@ -380,7 +383,7 @@ public static Page convertBytesRefsToOrdinals(Page page) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils - boolean hasNegativeValues = randomBoolean(); + int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); double[] rawValues = IntStream.concat( @@ -390,13 +393,19 @@ public static ExponentialHistogram randomExponentialHistogram() { ), hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - - int numBuckets = randomIntBetween(4, 300); - ReleasableExponentialHistogram histo = ExponentialHistogram.create( - numBuckets, - ExponentialHistogramCircuitBreaker.noop(), - rawValues - ); + ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues); + // Setup a proper zeroThreshold based on a random chance + if (histo.zeroBucket().count() > 0 && randomBoolean()) { + double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); + double zeroThreshold = smallestNonZeroValue * randomDouble(); + try (ReleasableExponentialHistogram releaseAfterCopy = histo) { + ZeroBucket zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(zeroBucket); + histo = builder.build(); + } + } + // Make the result histogram writeable to allow usage in Literals for testing return histo; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 3c466183b16db..2919e7c46d5cb 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -45,8 +45,11 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.utils.Geohash; @@ -165,12 +168,14 @@ import java.util.TreeMap; import java.util.function.Predicate; import java.util.jar.JarInputStream; +import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.zip.ZipEntry; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; import static org.elasticsearch.test.ESTestCase.assertEquals; import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; @@ -1045,7 +1050,7 @@ public static Literal randomLiteral(DataType type) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils - boolean hasNegativeValues = randomBoolean(); + int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); double[] rawValues = IntStream.concat( @@ -1056,12 +1061,18 @@ public static ExponentialHistogram randomExponentialHistogram() { hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - int numBuckets = randomIntBetween(4, 300); - ReleasableExponentialHistogram histo = ExponentialHistogram.create( - numBuckets, - ExponentialHistogramCircuitBreaker.noop(), - rawValues - ); + ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues); + // Setup a proper zeroThreshold based on a random chance + if (histo.zeroBucket().count() > 0 && randomBoolean()) { + double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); + double zeroThreshold = smallestNonZeroValue * randomDouble(); + try (ReleasableExponentialHistogram releaseAfterCopy = histo) { + ZeroBucket zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(zeroBucket); + histo = builder.build(); + } + } // Make the result histogram writeable to allow usage in Literals for testing return new WriteableExponentialHistogram(histo); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java index bbcdb989e7f8a..65264e8456c38 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java @@ -41,7 +41,7 @@ public class WriteableExponentialHistogram extends AbstractExponentialHistogram private final ExponentialHistogram delegate; - WriteableExponentialHistogram(ExponentialHistogram delegate) { + public WriteableExponentialHistogram(ExponentialHistogram delegate) { this.delegate = delegate; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java index 4d62c04528c55..50b845c8ed328 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java @@ -561,6 +561,7 @@ protected static String standardAggregatorName(String prefix, DataType type) { case IP -> "Ip"; case DATETIME, DATE_NANOS, LONG, COUNTER_LONG, UNSIGNED_LONG, GEOHASH, GEOTILE, GEOHEX -> "Long"; case AGGREGATE_METRIC_DOUBLE -> "AggregateMetricDouble"; + case EXPONENTIAL_HISTOGRAM -> "ExponentialHistogram"; case NULL -> "Null"; default -> throw new UnsupportedOperationException("name for [" + type + "]"); }; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java index 93f650398a605..879f43d2f90ea 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.Geometry; @@ -21,6 +22,8 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.WriteableExponentialHistogram; +import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.versionfield.Version; @@ -491,6 +494,29 @@ public static List geohexCases(int minRows, int maxRows) { return cases; } + public static List exponentialHistogramCases(int minRows, int maxRows) { + List cases = new ArrayList<>(); + if (EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG.isEnabled()) { + addSuppliers( + cases, + minRows, + maxRows, + "empty exponential histograms", + DataType.EXPONENTIAL_HISTOGRAM, + () -> new WriteableExponentialHistogram(ExponentialHistogram.empty()) + ); + addSuppliers( + cases, + minRows, + maxRows, + "random exponential histograms", + DataType.EXPONENTIAL_HISTOGRAM, + EsqlTestUtils::randomExponentialHistogram + ); + } + return cases; + } + public static List stringCases(int minRows, int maxRows, DataType type) { List cases = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java new file mode 100644 index 0000000000000..1a1ed18182e6c --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.ErrorsForCasesWithoutExamplesTestCase; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; +import org.hamcrest.Matcher; +import org.junit.Before; + +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class MergeErrorTests extends ErrorsForCasesWithoutExamplesTestCase { + + @Before + public void setup() { + assumeTrue( + "Only when esql_exponential_histogram feature flag is enabled", + EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG.isEnabled() + ); + } + + @Override + protected List cases() { + return paramsToSuppliers(MergeTests.parameters()); + } + + @Override + protected Expression build(Source source, List args) { + return new Merge(source, args.get(0)); + } + + @Override + protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) { + return equalTo(typeErrorMessage(false, validPerPosition, signature, (v, p) -> "exponential_histogram")); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java new file mode 100644 index 0000000000000..7136aa113e84b --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; + +import java.io.IOException; + +public class MergeSerializationTests extends AbstractExpressionSerializationTests { + @Override + protected Merge createTestInstance() { + return new Merge(randomSource(), randomChild()); + } + + @Override + protected Merge mutateInstance(Merge instance) throws IOException { + return new Merge(instance.source(), randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild)); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java new file mode 100644 index 0000000000000..b2621b4f06b60 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; +import org.elasticsearch.exponentialhistogram.ZeroBucket; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; +import org.elasticsearch.xpack.esql.expression.function.MultiRowTestCaseSupplier; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.compute.aggregation.ExponentialHistogramStates.MAX_BUCKET_COUNT; +import static org.hamcrest.Matchers.equalTo; + +public class MergeTests extends AbstractAggregationTestCase { + public MergeTests(@Name("TestCase") Supplier testCaseSupplier) { + this.testCase = testCaseSupplier.get(); + } + + @ParametersFactory + public static Iterable parameters() { + var suppliers = new ArrayList(); + + Stream.of(MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100)) + .flatMap(List::stream) + .map(MergeTests::makeSupplier) + .collect(Collectors.toCollection(() -> suppliers)); + + return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers, true); + } + + @Override + protected Expression build(Source source, List args) { + return new Merge(source, args.get(0)); + } + + private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSupplier) { + return new TestCaseSupplier(List.of(fieldSupplier.type()), () -> { + var fieldTypedData = fieldSupplier.get(); + var fieldValues = fieldTypedData.multiRowData(); + + ExponentialHistogramMerger merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, ExponentialHistogramCircuitBreaker.noop()); + + boolean anyValuesNonNull = false; + + for (var fieldValue : fieldValues) { + ExponentialHistogram histogram = (ExponentialHistogram) fieldValue; + if (histogram != null) { + anyValuesNonNull = true; + merger.add(histogram); + } + } + + var expected = anyValuesNonNull ? merger.get() : null; + return new TestCaseSupplier.TestCase( + List.of(fieldTypedData), + standardAggregatorName("Merge", fieldSupplier.type()), + DataType.EXPONENTIAL_HISTOGRAM, + equalToWithLenientZeroBucket(expected) + ); + }); + } + + private static Matcher equalToWithLenientZeroBucket(ExponentialHistogram expected) { + // if there is no zero-threshold involved, merging is deterministic + if (expected.zeroBucket().zeroThreshold() == 0) { + return equalTo(expected); + } + + // if there is a zero-threshold involed, things get a little more hairy + // the exact merge result depends on the order in which downscales happen vs when the highest zero threshold is seen + // this means the zero-bucket can be different to the expected result and the scale can slightly differ + // we fix this by adjusting both histograms to the same scale + + return new BaseMatcher() { + @Override + public boolean matches(Object actualObj) { + if (actualObj instanceof ExponentialHistogram == false) { + return false; + } + ExponentialHistogram actual = (ExponentialHistogram) actualObj; + + // step one: bring both histogram to the same scale + int targetScale = Math.min(actual.scale(), expected.scale()); + ExponentialHistogram a = downscaleTo(actual, targetScale); + ExponentialHistogram b = downscaleTo(expected, targetScale); + + // step two: bring the zero-threshold of both histograms to the same value (the higher one) + ZeroBucket targetZeroBucket; + if (a.zeroBucket().compareZeroThreshold(b.zeroBucket()) >= 0) { + targetZeroBucket = a.zeroBucket(); + } else { + targetZeroBucket = b.zeroBucket(); + } + a = increaseZeroThreshold(a, targetZeroBucket); + b = increaseZeroThreshold(b, targetZeroBucket); + // now they should actually be equal! + return a.equals(b); + } + + @Override + public void describeTo(Description description) { + description.appendValue(expected); + } + }; + } + + private static ExponentialHistogram downscaleTo(ExponentialHistogram histogram, int targetScale) { + assert histogram.scale() >= targetScale; + ExponentialHistogramMerger merger = ExponentialHistogramMerger.createWithMaxScale(MAX_BUCKET_COUNT, targetScale, ExponentialHistogramCircuitBreaker.noop()); + merger.addWithoutUpscaling(histogram); + return merger.get(); + } + + private static ExponentialHistogram increaseZeroThreshold(ExponentialHistogram histo, ZeroBucket targetZeroBucket) { + ExponentialHistogramMerger merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, ExponentialHistogramCircuitBreaker.noop()); + merger.addWithoutUpscaling(histo); + // now add a histogram with only the zero-threshold with a count of 1 to trigger merging of overlapping buckets + merger.add( + ExponentialHistogram.builder(ExponentialHistogram.MAX_SCALE, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(copyWithNewCount(targetZeroBucket, 1)) + .build() + ); + // the merger now has the desired zero-threshold, but we need to subtract the fake zero count again + ExponentialHistogram mergeResult = merger.get(); + return ExponentialHistogram.builder(mergeResult, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(copyWithNewCount(mergeResult.zeroBucket(), mergeResult.zeroBucket().count() - 1)) + .build(); + } + + private static ZeroBucket copyWithNewCount(ZeroBucket zb, long newCount) { + if (zb.isIndexBased()) { + return ZeroBucket.create(zb.index(), zb.scale(), newCount); + } else { + return ZeroBucket.create(zb.zeroThreshold(), newCount); + } + } +} From 59fa9784da34d894d2bd343d90bd6c91421f7b0c Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 12:03:12 +0100 Subject: [PATCH 03/11] Implemented percentile agg as surrogate --- .../function/aggregate/Percentile.java | 10 +++- .../xpack/esql/analysis/AnalyzerTests.java | 4 +- .../aggregate/PercentileErrorTests.java | 8 ++- .../function/aggregate/PercentileTests.java | 55 ++++++++++++++----- 4 files changed, 58 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java index 23b55c429c009..cf7d28aa4e6c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionType; import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble; +import org.elasticsearch.xpack.esql.expression.function.scalar.histogram.HistogramPercentile; import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvPercentile; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; @@ -74,7 +75,7 @@ public class Percentile extends NumericAggregate implements SurrogateExpression ) public Percentile( Source source, - @Param(name = "number", type = { "double", "integer", "long" }) Expression field, + @Param(name = "number", type = { "double", "integer", "long", "exponential_histogram" }) Expression field, @Param(name = "percentile", type = { "double", "integer", "long" }) Expression percentile ) { this(source, field, Literal.TRUE, NO_WINDOW, percentile); @@ -127,10 +128,10 @@ protected TypeResolution resolveType() { TypeResolution resolution = isType( field(), - dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, + dt -> (dt.isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.EXPONENTIAL_HISTOGRAM, sourceText(), FIRST, - "numeric except unsigned_long" + "exponential_histogram or numeric except unsigned_long" ); if (resolution.unresolved()) { return resolution; @@ -168,6 +169,9 @@ private double percentileValue() { public Expression surrogate() { var field = field(); + if (field.dataType() == DataType.EXPONENTIAL_HISTOGRAM) { + return new HistogramPercentile(source(), new Merge(source(), field, filter()), percentile()); + } if (field.foldable()) { return new MvPercentile(source(), new ToDouble(source(), field), percentile()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 76eba5fd0eb2e..dae1d12a8dc96 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2051,7 +2051,7 @@ public void testUnsupportedTypesInStats() { found value [x] type [unsigned_long] line 2:58: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long] - line 2:96: first argument of [percentile(x, 10)] must be [numeric except unsigned_long],\ + line 2:96: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long],\ found value [x] type [unsigned_long] line 2:115: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long]"""); @@ -2067,7 +2067,7 @@ public void testUnsupportedTypesInStats() { found value [x] type [version] line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ found value [x] type [version] - line 2:59: first argument of [percentile(x, 10)] must be [numeric except unsigned_long], found value [x] type [version] + line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long], found value [x] type [version] line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [version]"""); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java index b2f701f41792b..dbfd3cae0c5f3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java @@ -32,6 +32,12 @@ protected Expression build(Source source, List args) { @Override protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) { - return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> "numeric except unsigned_long")); + return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> + switch(p) { + case 0 -> "exponential_histogram or numeric except unsigned_long"; + case 1 -> "numeric except unsigned_long"; + default -> throw new IllegalStateException("Unexpected argument index " + p); + } + )); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java index bd8f6e96cff32..e0a2ab40879dc 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java @@ -11,6 +11,10 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Types; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -21,9 +25,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Stream; +import static org.elasticsearch.compute.aggregation.ExponentialHistogramStates.MAX_BUCKET_COUNT; import static org.hamcrest.Matchers.equalTo; public class PercentileTests extends AbstractAggregationTestCase { @@ -38,7 +44,8 @@ public static Iterable parameters() { var fieldCases = Stream.of( MultiRowTestCaseSupplier.intCases(1, 1000, Integer.MIN_VALUE, Integer.MAX_VALUE, true), MultiRowTestCaseSupplier.longCases(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE, true), - MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true) + MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true), + MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100) ).flatMap(List::stream).toList(); var percentileCases = Stream.of( @@ -71,20 +78,42 @@ private static TestCaseSupplier makeSupplier( var percentile = ((Number) percentileTypedData.data()).doubleValue(); - try (var digest = TDigestState.create(newLimitedBreaker(ByteSizeValue.ofMb(100)), 1000)) { - for (var value : fieldTypedData.multiRowData()) { - digest.add(((Number) value).doubleValue()); - } + Double expected; + if (fieldTypedData.type() == DataType.EXPONENTIAL_HISTOGRAM) { + // Note that the merging used underneath can be dependent on the order if zero-buckets are involved + // therefore the percentile in theory could vary slightly + // however, it seems that the order is the same in the tests vs the reference computation + // if we ever encounter flakes here, we should replace the equalTo() assertion with an assertion on the relative error + expected = getExpectedPercentileForExponentialHistograms(Types.forciblyCast(fieldTypedData.multiRowData()), percentile); + } else { + expected = getExpectedPercentileForNumbers(Types.forciblyCast(fieldTypedData.multiRowData()), percentile); + } - var expected = digest.size() == 0 ? null : digest.quantile(percentile / 100); + return new TestCaseSupplier.TestCase( + List.of(fieldTypedData, percentileTypedData), + standardAggregatorName("Percentile", fieldSupplier.type()), + DataType.DOUBLE, + equalTo(expected) + ); + }); + } - return new TestCaseSupplier.TestCase( - List.of(fieldTypedData, percentileTypedData), - standardAggregatorName("Percentile", fieldSupplier.type()), - DataType.DOUBLE, - equalTo(expected) - ); + private static Double getExpectedPercentileForNumbers(List values, double percentile) { + try (var digest = TDigestState.create(newLimitedBreaker(ByteSizeValue.ofMb(100)), 1000)) { + for (var value : values) { + digest.add(value.doubleValue()); } - }); + return digest.size() == 0 ? null : digest.quantile(percentile / 100); + } + } + + private static Double getExpectedPercentileForExponentialHistograms(List values, double percentile) { + ExponentialHistogram merged = ExponentialHistogram.merge( + MAX_BUCKET_COUNT, + ExponentialHistogramCircuitBreaker.noop(), + values.stream().filter(Objects::nonNull).toList().iterator() + ); + double result = ExponentialHistogramQuantile.getQuantile(merged, percentile / 100.0); + return Double.isNaN(result) ? null : result; } } From 5e2e248d2aa84e2f5848ef1d518e9e3a8d2933b0 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 7 Nov 2025 11:15:47 +0000 Subject: [PATCH 04/11] [CI] Auto commit changes from spotless --- .../compute/test/BlockTestUtils.java | 9 ++++-- .../xpack/esql/EsqlTestUtils.java | 11 ++++--- .../xpack/esql/analysis/AnalyzerTests.java | 31 ++++++++++--------- .../function/aggregate/MergeTests.java | 15 ++++++--- .../aggregate/PercentileErrorTests.java | 12 +++---- 5 files changed, 47 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 0084b2c30c87d..9f0a0be60d5f7 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -383,7 +383,8 @@ public static Page convertBytesRefsToOrdinals(Page page) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils - int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); + int numBuckets = randomIntBetween(4, 300); + boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); double[] rawValues = IntStream.concat( @@ -393,7 +394,11 @@ public static ExponentialHistogram randomExponentialHistogram() { ), hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues); + ReleasableExponentialHistogram histo = ExponentialHistogram.create( + numBuckets, + ExponentialHistogramCircuitBreaker.noop(), + rawValues + ); // Setup a proper zeroThreshold based on a random chance if (histo.zeroBucket().count() > 0 && randomBoolean()) { double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 2919e7c46d5cb..026425eef0fd0 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -47,7 +47,6 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; -import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.exponentialhistogram.ZeroBucket; import org.elasticsearch.geo.GeometryTestUtils; @@ -175,7 +174,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; -import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; import static org.elasticsearch.test.ESTestCase.assertEquals; import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; @@ -1050,7 +1048,8 @@ public static Literal randomLiteral(DataType type) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils - int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); + int numBuckets = randomIntBetween(4, 300); + boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); double[] rawValues = IntStream.concat( @@ -1061,7 +1060,11 @@ public static ExponentialHistogram randomExponentialHistogram() { hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues); + ReleasableExponentialHistogram histo = ExponentialHistogram.create( + numBuckets, + ExponentialHistogramCircuitBreaker.noop(), + rawValues + ); // Setup a proper zeroThreshold based on a random chance if (histo.zeroBucket().count() > 0 && randomBoolean()) { double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index dae1d12a8dc96..e9e396f8bde4e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2056,20 +2056,23 @@ public void testUnsupportedTypesInStats() { line 2:115: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long]"""); - verifyUnsupported(""" - row x = to_version("1.2") - | stats avg(x), median(x), median_absolute_deviation(x), percentile(x, 10), sum(x) - """, """ - Found 5 problems - line 2:10: argument of [avg(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:18: argument of [median(x)] must be [numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long], found value [x] type [version] - line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ - found value [x] type [version]"""); + verifyUnsupported( + """ + row x = to_version("1.2") + | stats avg(x), median(x), median_absolute_deviation(x), percentile(x, 10), sum(x) + """, + """ + Found 5 problems + line 2:10: argument of [avg(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:18: argument of [median(x)] must be [numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long], found value [x] type [version] + line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ + found value [x] type [version]""" + ); } public void testInOnText() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java index b2621b4f06b60..d912130af0ab0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java @@ -60,7 +60,10 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier var fieldTypedData = fieldSupplier.get(); var fieldValues = fieldTypedData.multiRowData(); - ExponentialHistogramMerger merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, ExponentialHistogramCircuitBreaker.noop()); + ExponentialHistogramMerger merger = ExponentialHistogramMerger.create( + MAX_BUCKET_COUNT, + ExponentialHistogramCircuitBreaker.noop() + ); boolean anyValuesNonNull = false; @@ -128,7 +131,11 @@ public void describeTo(Description description) { private static ExponentialHistogram downscaleTo(ExponentialHistogram histogram, int targetScale) { assert histogram.scale() >= targetScale; - ExponentialHistogramMerger merger = ExponentialHistogramMerger.createWithMaxScale(MAX_BUCKET_COUNT, targetScale, ExponentialHistogramCircuitBreaker.noop()); + ExponentialHistogramMerger merger = ExponentialHistogramMerger.createWithMaxScale( + MAX_BUCKET_COUNT, + targetScale, + ExponentialHistogramCircuitBreaker.noop() + ); merger.addWithoutUpscaling(histogram); return merger.get(); } @@ -139,8 +146,8 @@ private static ExponentialHistogram increaseZeroThreshold(ExponentialHistogram h // now add a histogram with only the zero-threshold with a count of 1 to trigger merging of overlapping buckets merger.add( ExponentialHistogram.builder(ExponentialHistogram.MAX_SCALE, ExponentialHistogramCircuitBreaker.noop()) - .zeroBucket(copyWithNewCount(targetZeroBucket, 1)) - .build() + .zeroBucket(copyWithNewCount(targetZeroBucket, 1)) + .build() ); // the merger now has the desired zero-threshold, but we need to subtract the fake zero count again ExponentialHistogram mergeResult = merger.get(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java index dbfd3cae0c5f3..4c0440de3d8af 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java @@ -32,12 +32,10 @@ protected Expression build(Source source, List args) { @Override protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) { - return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> - switch(p) { - case 0 -> "exponential_histogram or numeric except unsigned_long"; - case 1 -> "numeric except unsigned_long"; - default -> throw new IllegalStateException("Unexpected argument index " + p); - } - )); + return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> switch (p) { + case 0 -> "exponential_histogram or numeric except unsigned_long"; + case 1 -> "numeric except unsigned_long"; + default -> throw new IllegalStateException("Unexpected argument index " + p); + })); } } From 8b4d0a375fce6e5ec0381b6a1d90a45a629ccf7e Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 13:07:57 +0100 Subject: [PATCH 05/11] fix checkstyle --- .../xpack/esql/analysis/AnalyzerTests.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index e9e396f8bde4e..ee5a2a653b67d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2056,23 +2056,21 @@ public void testUnsupportedTypesInStats() { line 2:115: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long]"""); - verifyUnsupported( - """ - row x = to_version("1.2") - | stats avg(x), median(x), median_absolute_deviation(x), percentile(x, 10), sum(x) - """, - """ - Found 5 problems - line 2:10: argument of [avg(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:18: argument of [median(x)] must be [numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ - found value [x] type [version] - line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long], found value [x] type [version] - line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ - found value [x] type [version]""" - ); + verifyUnsupported(""" + row x = to_version("1.2") + | stats avg(x), median(x), median_absolute_deviation(x), percentile(x, 10), sum(x) + """, """ + Found 5 problems + line 2:10: argument of [avg(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:18: argument of [median(x)] must be [numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ + found value [x] type [version] + line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long],\ + found value [x] type [version] + line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ + found value [x] type [version]"""); } public void testInOnText() { From cc19f269caaf72c991421fcc1766565102c74aa7 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 13:32:23 +0100 Subject: [PATCH 06/11] Add CSV tests --- .../resources/exponential_histogram.csv-spec | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec index 941d1abcdb72f..af7e5d10a5a1a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec @@ -13,3 +13,29 @@ dummy-positive_only | "{""scale"":2,""sum"":1275.0,""min"":1.0,""max"":50. dummy-zero_count_only | "{""scale"":2,""sum"":0.0,""min"":0.0,""max"":0.0,""zero"":{""count"":101}}" dummy-zero_threshold_only | "{""scale"":0,""sum"":0.0,""zero"":{""threshold"":2.0E-5}}" ; + +ungroupedPercentiles +required_capability: exponential_histogram + +FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") + | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) +; + +p0:double | p50:double | p99:double | p100:double +2.17E-4 | 0.0016965155735109547 | 0.9472324359683735 | 6.786232 +; + +groupedPercentiles +required_capability: exponential_histogram + +FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") + | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) BY instance + | KEEP instance, p0, p50, p99, p100 + | SORT instance +; + +instance:keyword | p0:double | p50:double | p99:double | p100:double +instance-0 | 2.4E-4 | 0.02114040198637821 | 1.043294617499776 | 6.786232 +instance-1 | 2.17E-4 | 6.469364299069457E-4 | 0.1422151059789435 | 3.190723 +instance-2 | 2.2E-4 | 6.469364299069457E-4 | 0.08576721941805483 | 2.7059714542564097 +; From 001d7b172537863a0b5940f8de2bf43677861d58 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 14:41:48 +0100 Subject: [PATCH 07/11] Add rounding to CSV tests to avoid floating point trouble --- .../main/resources/exponential_histogram.csv-spec | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec index af7e5d10a5a1a..4479eb59c0183 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec @@ -19,10 +19,12 @@ required_capability: exponential_histogram FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) + | EVAL p50 = ROUND(p50, 7), p99 = ROUND(p99, 7) // rounding to avoid floating point precision issues, min and max are exact so no rounding needed + | KEEP p0, p50, p99, p100 ; -p0:double | p50:double | p99:double | p100:double -2.17E-4 | 0.0016965155735109547 | 0.9472324359683735 | 6.786232 +p0:double | p50:double | p99:double | p100:double +2.17E-4 | 0.0016965 | 0.9472324 | 6.786232 ; groupedPercentiles @@ -30,12 +32,13 @@ required_capability: exponential_histogram FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) BY instance + | EVAL p50 = ROUND(p50, 7), p99 = ROUND(p99, 7) // rounding to avoid floating point precision issues, min and max are exact so no rounding needed | KEEP instance, p0, p50, p99, p100 | SORT instance ; -instance:keyword | p0:double | p50:double | p99:double | p100:double -instance-0 | 2.4E-4 | 0.02114040198637821 | 1.043294617499776 | 6.786232 -instance-1 | 2.17E-4 | 6.469364299069457E-4 | 0.1422151059789435 | 3.190723 -instance-2 | 2.2E-4 | 6.469364299069457E-4 | 0.08576721941805483 | 2.7059714542564097 +instance:keyword | p0:double | p50:double | p99:double | p100:double +instance-0 | 2.4E-4 | 0.0211404 | 1.0432946 | 6.786232 +instance-1 | 2.17E-4 | 6.469E-4 | 0.1422151 | 3.190723 +instance-2 | 2.2E-4 | 6.469E-4 | 0.0857672 | 2.7059714542564097 ; From b026eba65cbdf4d89a46fa9bad14fadefea19653 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 15:07:42 +0100 Subject: [PATCH 08/11] Add javadoc --- .../xpack/esql/expression/function/aggregate/Merge.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java index f0cfbd1caeb51..5169db06b7792 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java @@ -29,6 +29,11 @@ import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; +/** + * Merges multiple histograms into a single histogram. + * Note that this function is currently only intended for usage in surrogates and not available as a user-facing function. + * Therefore, it is intentionally not registered in {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}. + */ public class Merge extends AggregateFunction implements ToAggregator { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Merge", Merge::new); From f6601f1606a09078780fe68edcb2c5aabb8d9647 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 7 Nov 2025 15:30:39 +0100 Subject: [PATCH 09/11] use separate capability for percentiles to make bwc tests happy --- .../src/main/resources/exponential_histogram.csv-spec | 4 ++-- .../elasticsearch/xpack/esql/action/EsqlCapabilities.java | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec index 4479eb59c0183..bf2be6cfed0db 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec @@ -15,7 +15,7 @@ dummy-zero_threshold_only | "{""scale"":0,""sum"":0.0,""zero"":{""threshold"":2. ; ungroupedPercentiles -required_capability: exponential_histogram +required_capability: exponential_histogram_percentiles_support FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) @@ -28,7 +28,7 @@ p0:double | p50:double | p99:double | p100:double ; groupedPercentiles -required_capability: exponential_histogram +required_capability: exponential_histogram_percentiles_support FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) BY instance diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index bd7e924f84cc1..54eca8af0847c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1637,6 +1637,11 @@ public enum Cap { FULL_TEXT_FUNCTIONS_ACCEPT_NULL_FIELD, + /** + * Support for exponential_histogram type in PERCENTILES aggregation. + */ + EXPONENTIAL_HISTOGRAM_PERCENTILES_SUPPORT(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG), + // Last capability should still have a comma for fewer merge conflicts when adding new ones :) // This comment prevents the semicolon from being on the previous capability when Spotless formats the file. ; From 1d203c92dc0154a5d4c34d10dd73026351f69a39 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 13 Nov 2025 08:04:28 +0100 Subject: [PATCH 10/11] Rename Merge to HistogramMerge --- ...ponentialHistogramAggregatorFunction.java} | 20 +++++++------- ...lHistogramAggregatorFunctionSupplier.java} | 22 ++++++++-------- ...lHistogramGroupingAggregatorFunction.java} | 26 +++++++++---------- ...mMergeExponentialHistogramAggregator.java} | 2 +- .../aggregate/AggregateWritables.java | 2 +- .../{Merge.java => HistogramMerge.java} | 26 +++++++++---------- .../function/aggregate/Percentile.java | 2 +- ...sts.java => HistogramMergeErrorTests.java} | 6 ++--- ... => HistogramMergeSerializationTests.java} | 10 +++---- ...rgeTests.java => HistogramMergeTests.java} | 10 +++---- 10 files changed, 63 insertions(+), 63 deletions(-) rename x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/{MergeExponentialHistogramAggregatorFunction.java => HistogramMergeExponentialHistogramAggregatorFunction.java} (82%) rename x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/{MergeExponentialHistogramAggregatorFunctionSupplier.java => HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java} (50%) rename x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/{MergeExponentialHistogramGroupingAggregatorFunction.java => HistogramMergeExponentialHistogramGroupingAggregatorFunction.java} (85%) rename x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/{MergeExponentialHistogramAggregator.java => HistogramMergeExponentialHistogramAggregator.java} (97%) rename x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/{Merge.java => HistogramMerge.java} (72%) rename x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/{MergeErrorTests.java => HistogramMergeErrorTests.java} (87%) rename x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/{MergeSerializationTests.java => HistogramMergeSerializationTests.java} (51%) rename x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/{MergeTests.java => HistogramMergeTests.java} (95%) diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java similarity index 82% rename from x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java rename to x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java index bc4ebd785bb0f..85a2298a76a2d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java @@ -19,10 +19,10 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogram; /** - * {@link AggregatorFunction} implementation for {@link MergeExponentialHistogramAggregator}. + * {@link AggregatorFunction} implementation for {@link HistogramMergeExponentialHistogramAggregator}. * This class is generated. Edit {@code AggregatorImplementer} instead. */ -public final class MergeExponentialHistogramAggregatorFunction implements AggregatorFunction { +public final class HistogramMergeExponentialHistogramAggregatorFunction implements AggregatorFunction { private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); @@ -32,16 +32,16 @@ public final class MergeExponentialHistogramAggregatorFunction implements Aggreg private final List channels; - public MergeExponentialHistogramAggregatorFunction(DriverContext driverContext, + public HistogramMergeExponentialHistogramAggregatorFunction(DriverContext driverContext, List channels, ExponentialHistogramStates.SingleState state) { this.driverContext = driverContext; this.channels = channels; this.state = state; } - public static MergeExponentialHistogramAggregatorFunction create(DriverContext driverContext, - List channels) { - return new MergeExponentialHistogramAggregatorFunction(driverContext, channels, MergeExponentialHistogramAggregator.initSingle(driverContext)); + public static HistogramMergeExponentialHistogramAggregatorFunction create( + DriverContext driverContext, List channels) { + return new HistogramMergeExponentialHistogramAggregatorFunction(driverContext, channels, HistogramMergeExponentialHistogramAggregator.initSingle(driverContext)); } public static List intermediateStateDesc() { @@ -85,7 +85,7 @@ private void addRawBlock(ExponentialHistogramBlock valueBlock) { int valueEnd = valueStart + valueValueCount; for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); - MergeExponentialHistogramAggregator.combine(state, valueValue); + HistogramMergeExponentialHistogramAggregator.combine(state, valueValue); } } } @@ -104,7 +104,7 @@ private void addRawBlock(ExponentialHistogramBlock valueBlock, BooleanVector mas int valueEnd = valueStart + valueValueCount; for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); - MergeExponentialHistogramAggregator.combine(state, valueValue); + HistogramMergeExponentialHistogramAggregator.combine(state, valueValue); } } } @@ -120,7 +120,7 @@ public void addIntermediateInput(Page page) { ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; assert value.getPositionCount() == 1; ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); - MergeExponentialHistogramAggregator.combineIntermediate(state, value.getExponentialHistogram(value.getFirstValueIndex(0), valueScratch)); + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, value.getExponentialHistogram(value.getFirstValueIndex(0), valueScratch)); } @Override @@ -130,7 +130,7 @@ public void evaluateIntermediate(Block[] blocks, int offset, DriverContext drive @Override public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { - blocks[offset] = MergeExponentialHistogramAggregator.evaluateFinal(state, driverContext); + blocks[offset] = HistogramMergeExponentialHistogramAggregator.evaluateFinal(state, driverContext); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java similarity index 50% rename from x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java rename to x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java index ab544bfeb2bbe..89caa5adaef3c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregatorFunctionSupplier.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java @@ -11,37 +11,37 @@ import org.elasticsearch.compute.operator.DriverContext; /** - * {@link AggregatorFunctionSupplier} implementation for {@link MergeExponentialHistogramAggregator}. + * {@link AggregatorFunctionSupplier} implementation for {@link HistogramMergeExponentialHistogramAggregator}. * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. */ -public final class MergeExponentialHistogramAggregatorFunctionSupplier implements AggregatorFunctionSupplier { - public MergeExponentialHistogramAggregatorFunctionSupplier() { +public final class HistogramMergeExponentialHistogramAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public HistogramMergeExponentialHistogramAggregatorFunctionSupplier() { } @Override public List nonGroupingIntermediateStateDesc() { - return MergeExponentialHistogramAggregatorFunction.intermediateStateDesc(); + return HistogramMergeExponentialHistogramAggregatorFunction.intermediateStateDesc(); } @Override public List groupingIntermediateStateDesc() { - return MergeExponentialHistogramGroupingAggregatorFunction.intermediateStateDesc(); + return HistogramMergeExponentialHistogramGroupingAggregatorFunction.intermediateStateDesc(); } @Override - public MergeExponentialHistogramAggregatorFunction aggregator(DriverContext driverContext, - List channels) { - return MergeExponentialHistogramAggregatorFunction.create(driverContext, channels); + public HistogramMergeExponentialHistogramAggregatorFunction aggregator( + DriverContext driverContext, List channels) { + return HistogramMergeExponentialHistogramAggregatorFunction.create(driverContext, channels); } @Override - public MergeExponentialHistogramGroupingAggregatorFunction groupingAggregator( + public HistogramMergeExponentialHistogramGroupingAggregatorFunction groupingAggregator( DriverContext driverContext, List channels) { - return MergeExponentialHistogramGroupingAggregatorFunction.create(channels, driverContext); + return HistogramMergeExponentialHistogramGroupingAggregatorFunction.create(channels, driverContext); } @Override public String describe() { - return "merge_exponential of histograms"; + return "histogram_merge_exponential of histograms"; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java similarity index 85% rename from x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java rename to x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java index 176b68f47f1e2..cfbfdf6d222d4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MergeExponentialHistogramGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java @@ -21,10 +21,10 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogram; /** - * {@link GroupingAggregatorFunction} implementation for {@link MergeExponentialHistogramAggregator}. + * {@link GroupingAggregatorFunction} implementation for {@link HistogramMergeExponentialHistogramAggregator}. * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. */ -public final class MergeExponentialHistogramGroupingAggregatorFunction implements GroupingAggregatorFunction { +public final class HistogramMergeExponentialHistogramGroupingAggregatorFunction implements GroupingAggregatorFunction { private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); @@ -34,16 +34,16 @@ public final class MergeExponentialHistogramGroupingAggregatorFunction implement private final DriverContext driverContext; - public MergeExponentialHistogramGroupingAggregatorFunction(List channels, + public HistogramMergeExponentialHistogramGroupingAggregatorFunction(List channels, ExponentialHistogramStates.GroupingState state, DriverContext driverContext) { this.channels = channels; this.state = state; this.driverContext = driverContext; } - public static MergeExponentialHistogramGroupingAggregatorFunction create(List channels, - DriverContext driverContext) { - return new MergeExponentialHistogramGroupingAggregatorFunction(channels, MergeExponentialHistogramAggregator.initGrouping(driverContext.bigArrays(), driverContext), driverContext); + public static HistogramMergeExponentialHistogramGroupingAggregatorFunction create( + List channels, DriverContext driverContext) { + return new HistogramMergeExponentialHistogramGroupingAggregatorFunction(channels, HistogramMergeExponentialHistogramAggregator.initGrouping(driverContext.bigArrays(), driverContext), driverContext); } public static List intermediateStateDesc() { @@ -101,7 +101,7 @@ private void addRawInput(int positionOffset, IntArrayBlock groups, int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); - MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); } } } @@ -126,7 +126,7 @@ public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page for (int g = groupStart; g < groupEnd; g++) { int groupId = groups.getInt(g); int valuesPosition = groupPosition + positionOffset; - MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); } } } @@ -150,7 +150,7 @@ private void addRawInput(int positionOffset, IntBigArrayBlock groups, int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); - MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); } } } @@ -175,7 +175,7 @@ public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Pa for (int g = groupStart; g < groupEnd; g++) { int groupId = groups.getInt(g); int valuesPosition = groupPosition + positionOffset; - MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); } } } @@ -193,7 +193,7 @@ private void addRawInput(int positionOffset, IntVector groups, int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); - MergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); } } } @@ -211,7 +211,7 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); int valuesPosition = groupPosition + positionOffset; - MergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); } } @@ -235,7 +235,7 @@ public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) @Override public void evaluateFinal(Block[] blocks, int offset, IntVector selected, GroupingAggregatorEvaluationContext ctx) { - blocks[offset] = MergeExponentialHistogramAggregator.evaluateFinal(state, selected, ctx); + blocks[offset] = HistogramMergeExponentialHistogramAggregator.evaluateFinal(state, selected, ctx); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java similarity index 97% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java index 4796de9cf3a61..f3bc2871b53a8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MergeExponentialHistogramAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java @@ -18,7 +18,7 @@ @Aggregator({ @IntermediateState(name = "value", type = "EXPONENTIAL_HISTOGRAM"), }) @GroupingAggregator -public class MergeExponentialHistogramAggregator { +public class HistogramMergeExponentialHistogramAggregator { public static ExponentialHistogramStates.SingleState initSingle(DriverContext driverContext) { return new ExponentialHistogramStates.SingleState(driverContext.breaker()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index bb3b539c7ea53..4ca37d3c10137 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -52,7 +52,7 @@ public static List getNamedWriteables() { Absent.ENTRY, AbsentOverTime.ENTRY, DimensionValues.ENTRY, - Merge.ENTRY + HistogramMerge.ENTRY ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java similarity index 72% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java index 5169db06b7792..6e0affcb46d34 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Merge.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.MergeExponentialHistogramAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.HistogramMergeExponentialHistogramAggregatorFunctionSupplier; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -34,19 +34,19 @@ * Note that this function is currently only intended for usage in surrogates and not available as a user-facing function. * Therefore, it is intentionally not registered in {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}. */ -public class Merge extends AggregateFunction implements ToAggregator { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Merge", Merge::new); +public class HistogramMerge extends AggregateFunction implements ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Merge", HistogramMerge::new); @FunctionInfo(returnType = "exponential_histogram", type = FunctionType.AGGREGATE) - public Merge(Source source, @Param(name = "histogram", type = { "exponential_histogram" }) Expression field) { + public HistogramMerge(Source source, @Param(name = "histogram", type = { "exponential_histogram" }) Expression field) { this(source, field, Literal.TRUE); } - public Merge(Source source, Expression field, Expression filter) { + public HistogramMerge(Source source, Expression field, Expression filter) { super(source, field, filter, NO_WINDOW, emptyList()); } - private Merge(StreamInput in) throws IOException { + private HistogramMerge(StreamInput in) throws IOException { super(in); } @@ -66,24 +66,24 @@ protected TypeResolution resolveType() { } @Override - protected NodeInfo info() { - return NodeInfo.create(this, Merge::new, field(), filter()); + protected NodeInfo info() { + return NodeInfo.create(this, HistogramMerge::new, field(), filter()); } @Override - public Merge replaceChildren(List newChildren) { - return new Merge(source(), newChildren.get(0), newChildren.get(1)); + public HistogramMerge replaceChildren(List newChildren) { + return new HistogramMerge(source(), newChildren.get(0), newChildren.get(1)); } - public Merge withFilter(Expression filter) { - return new Merge(source(), field(), filter); + public HistogramMerge withFilter(Expression filter) { + return new HistogramMerge(source(), field(), filter); } @Override public final AggregatorFunctionSupplier supplier() { DataType type = field().dataType(); if (type == DataType.EXPONENTIAL_HISTOGRAM) { - return new MergeExponentialHistogramAggregatorFunctionSupplier(); + return new HistogramMergeExponentialHistogramAggregatorFunctionSupplier(); } throw EsqlIllegalArgumentException.illegalDataType(type); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java index cf7d28aa4e6c7..ff13b29df75f5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java @@ -170,7 +170,7 @@ public Expression surrogate() { var field = field(); if (field.dataType() == DataType.EXPONENTIAL_HISTOGRAM) { - return new HistogramPercentile(source(), new Merge(source(), field, filter()), percentile()); + return new HistogramPercentile(source(), new HistogramMerge(source(), field, filter()), percentile()); } if (field.foldable()) { return new MvPercentile(source(), new ToDouble(source(), field), percentile()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java similarity index 87% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java index 1a1ed18182e6c..f7867ffd651cd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeErrorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java @@ -21,7 +21,7 @@ import static org.hamcrest.Matchers.equalTo; -public class MergeErrorTests extends ErrorsForCasesWithoutExamplesTestCase { +public class HistogramMergeErrorTests extends ErrorsForCasesWithoutExamplesTestCase { @Before public void setup() { @@ -33,12 +33,12 @@ public void setup() { @Override protected List cases() { - return paramsToSuppliers(MergeTests.parameters()); + return paramsToSuppliers(HistogramMergeTests.parameters()); } @Override protected Expression build(Source source, List args) { - return new Merge(source, args.get(0)); + return new HistogramMerge(source, args.get(0)); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java similarity index 51% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java index 7136aa113e84b..e0736b4176eea 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java @@ -11,14 +11,14 @@ import java.io.IOException; -public class MergeSerializationTests extends AbstractExpressionSerializationTests { +public class HistogramMergeSerializationTests extends AbstractExpressionSerializationTests { @Override - protected Merge createTestInstance() { - return new Merge(randomSource(), randomChild()); + protected HistogramMerge createTestInstance() { + return new HistogramMerge(randomSource(), randomChild()); } @Override - protected Merge mutateInstance(Merge instance) throws IOException { - return new Merge(instance.source(), randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild)); + protected HistogramMerge mutateInstance(HistogramMerge instance) throws IOException { + return new HistogramMerge(instance.source(), randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild)); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java similarity index 95% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java index d912130af0ab0..a778cd57bed71 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MergeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java @@ -33,8 +33,8 @@ import static org.elasticsearch.compute.aggregation.ExponentialHistogramStates.MAX_BUCKET_COUNT; import static org.hamcrest.Matchers.equalTo; -public class MergeTests extends AbstractAggregationTestCase { - public MergeTests(@Name("TestCase") Supplier testCaseSupplier) { +public class HistogramMergeTests extends AbstractAggregationTestCase { + public HistogramMergeTests(@Name("TestCase") Supplier testCaseSupplier) { this.testCase = testCaseSupplier.get(); } @@ -44,7 +44,7 @@ public static Iterable parameters() { Stream.of(MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100)) .flatMap(List::stream) - .map(MergeTests::makeSupplier) + .map(HistogramMergeTests::makeSupplier) .collect(Collectors.toCollection(() -> suppliers)); return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers, true); @@ -52,7 +52,7 @@ public static Iterable parameters() { @Override protected Expression build(Source source, List args) { - return new Merge(source, args.get(0)); + return new HistogramMerge(source, args.get(0)); } private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSupplier) { @@ -78,7 +78,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier var expected = anyValuesNonNull ? merger.get() : null; return new TestCaseSupplier.TestCase( List.of(fieldTypedData), - standardAggregatorName("Merge", fieldSupplier.type()), + standardAggregatorName("HistogramMerge", fieldSupplier.type()), DataType.EXPONENTIAL_HISTOGRAM, equalToWithLenientZeroBucket(expected) ); From 46f3e1fd41d02b10b71a67a5119a90ccea1fbb25 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 13 Nov 2025 07:12:56 +0000 Subject: [PATCH 11/11] [CI] Auto commit changes from spotless --- .../esql/expression/function/aggregate/HistogramMerge.java | 6 +++++- .../aggregate/HistogramMergeSerializationTests.java | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java index 6e0affcb46d34..daf1c06144f74 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java @@ -35,7 +35,11 @@ * Therefore, it is intentionally not registered in {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}. */ public class HistogramMerge extends AggregateFunction implements ToAggregator { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Merge", HistogramMerge::new); + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "Merge", + HistogramMerge::new + ); @FunctionInfo(returnType = "exponential_histogram", type = FunctionType.AGGREGATE) public HistogramMerge(Source source, @Param(name = "histogram", type = { "exponential_histogram" }) Expression field) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java index e0736b4176eea..ca6c58ba6da93 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java @@ -19,6 +19,9 @@ protected HistogramMerge createTestInstance() { @Override protected HistogramMerge mutateInstance(HistogramMerge instance) throws IOException { - return new HistogramMerge(instance.source(), randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild)); + return new HistogramMerge( + instance.source(), + randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild) + ); } }