diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java new file mode 100644 index 0000000000000..85a2298a76a2d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunction.java @@ -0,0 +1,149 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramScratch; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * {@link AggregatorFunction} implementation for {@link HistogramMergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class HistogramMergeExponentialHistogramAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); + + private final DriverContext driverContext; + + private final ExponentialHistogramStates.SingleState state; + + private final List channels; + + public HistogramMergeExponentialHistogramAggregatorFunction(DriverContext driverContext, + List channels, ExponentialHistogramStates.SingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static HistogramMergeExponentialHistogramAggregatorFunction create( + DriverContext driverContext, List channels) { + return new HistogramMergeExponentialHistogramAggregatorFunction(driverContext, channels, HistogramMergeExponentialHistogramAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + addRawBlock(valueBlock, mask); + } + + private void addRawInputNotMasked(Page page) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + addRawBlock(valueBlock); + } + + private void addRawBlock(ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + int valueValueCount = valueBlock.getValueCount(p); + if (valueValueCount == 0) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueValueCount; + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + HistogramMergeExponentialHistogramAggregator.combine(state, valueValue); + } + } + } + + private void addRawBlock(ExponentialHistogramBlock valueBlock, BooleanVector mask) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + int valueValueCount = valueBlock.getValueCount(p); + if (valueValueCount == 0) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueValueCount; + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + HistogramMergeExponentialHistogramAggregator.combine(state, valueValue); + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + assert value.getPositionCount() == 1; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, value.getExponentialHistogram(value.getFirstValueIndex(0), valueScratch)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = HistogramMergeExponentialHistogramAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..89caa5adaef3c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link HistogramMergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class HistogramMergeExponentialHistogramAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public HistogramMergeExponentialHistogramAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return HistogramMergeExponentialHistogramAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return HistogramMergeExponentialHistogramGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public HistogramMergeExponentialHistogramAggregatorFunction aggregator( + DriverContext driverContext, List channels) { + return HistogramMergeExponentialHistogramAggregatorFunction.create(driverContext, channels); + } + + @Override + public HistogramMergeExponentialHistogramGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return HistogramMergeExponentialHistogramGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "histogram_merge_exponential of histograms"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..cfbfdf6d222d4 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramGroupingAggregatorFunction.java @@ -0,0 +1,254 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramScratch; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link HistogramMergeExponentialHistogramAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class HistogramMergeExponentialHistogramGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.EXPONENTIAL_HISTOGRAM) ); + + private final ExponentialHistogramStates.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public HistogramMergeExponentialHistogramGroupingAggregatorFunction(List channels, + ExponentialHistogramStates.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static HistogramMergeExponentialHistogramGroupingAggregatorFunction create( + List channels, DriverContext driverContext) { + return new HistogramMergeExponentialHistogramGroupingAggregatorFunction(channels, HistogramMergeExponentialHistogramAggregator.initGrouping(driverContext.bigArrays(), driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + ExponentialHistogramBlock valueBlock = page.getBlock(channels.get(0)); + maybeEnableGroupIdTracking(seenGroupIds, valueBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, + ExponentialHistogramBlock valueBlock) { + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + ExponentialHistogram valueValue = valueBlock.getExponentialHistogram(valueOffset, valueScratch); + HistogramMergeExponentialHistogramAggregator.combine(state, groupId, valueValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + ExponentialHistogramBlock value = (ExponentialHistogramBlock) valueUncast; + ExponentialHistogramScratch valueScratch = new ExponentialHistogramScratch(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + HistogramMergeExponentialHistogramAggregator.combineIntermediate(state, groupId, value.getExponentialHistogram(value.getFirstValueIndex(valuesPosition), valueScratch)); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, + ExponentialHistogramBlock valueBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = HistogramMergeExponentialHistogramAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java new file mode 100644 index 0000000000000..08eed3e453e98 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ExponentialHistogramStates.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; + +public final class ExponentialHistogramStates { + + // We currently use a hardcoded limit for the number of buckets, we might make this configurable / an aggregation parameter later + // The current default is what's also used by the OpenTelemetry SDKs + public static final int MAX_BUCKET_COUNT = 320; + + private record HistoBreaker(CircuitBreaker delegate) implements ExponentialHistogramCircuitBreaker { + @Override + public void adjustBreaker(long bytesAllocated) { + if (bytesAllocated < 0) { + delegate.addWithoutBreaking(bytesAllocated); + } else { + delegate.addEstimateBytesAndMaybeBreak(bytesAllocated, "ExponentialHistogram aggregation state"); + } + } + } + + private ExponentialHistogramStates() {} + + static final class SingleState implements AggregatorState { + + private final CircuitBreaker breaker; + // initialize lazily + private ExponentialHistogramMerger merger; + + SingleState(CircuitBreaker breaker) { + this.breaker = breaker; + } + + public void add(ExponentialHistogram histogram, boolean allowUpscale) { + if (histogram == null) { + return; + } + if (merger == null) { + merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, new HistoBreaker(breaker)); + } + if (allowUpscale) { + merger.add(histogram); + } else { + merger.addWithoutUpscaling(histogram); + } + } + + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + assert blocks.length >= offset + 1; + blocks[offset] = evaluateFinal(driverContext); + } + + @Override + public void close() { + Releasables.close(merger); + merger = null; + } + + public Block evaluateFinal(DriverContext driverContext) { + BlockFactory blockFactory = driverContext.blockFactory(); + if (merger == null) { + return blockFactory.newConstantNullBlock(1); + } else { + return blockFactory.newConstantExponentialHistogramBlock(merger.get(), 1); + } + } + } + + static final class GroupingState implements GroupingAggregatorState { + + private ObjectArray states; + private final HistoBreaker breaker; + private final BigArrays bigArrays; + + GroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.states = bigArrays.newObjectArray(1); + this.bigArrays = bigArrays; + this.breaker = new HistoBreaker(breaker); + } + + ExponentialHistogramMerger getOrNull(int position) { + if (position < states.size()) { + return states.get(position); + } else { + return null; + } + } + + public void add(int groupId, ExponentialHistogram histogram, boolean allowUpscale) { + if (histogram == null) { + return; + } + ensureCapacity(groupId); + var state = states.get(groupId); + if (state == null) { + state = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, breaker); + states.set(groupId, state); + } + if (allowUpscale) { + state.add(histogram); + } else { + state.addWithoutUpscaling(histogram); + } + } + + private void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 1 : "blocks=" + blocks.length + ",offset=" + offset; + blocks[offset] = evaluateFinal(selected, driverContext); + } + + public Block evaluateFinal(IntVector selected, DriverContext driverContext) { + try (var builder = driverContext.blockFactory().newExponentialHistogramBlockBuilder(selected.getPositionCount());) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int groupId = selected.getInt(i); + ExponentialHistogramMerger state = getOrNull(groupId); + if (state != null) { + builder.append(state.get()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void close() { + for (int i = 0; i < states.size(); i++) { + Releasables.close(states.get(i)); + } + Releasables.close(states); + states = null; + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java new file mode 100644 index 0000000000000..f3bc2871b53a8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HistogramMergeExponentialHistogramAggregator.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +@Aggregator({ @IntermediateState(name = "value", type = "EXPONENTIAL_HISTOGRAM"), }) +@GroupingAggregator +public class HistogramMergeExponentialHistogramAggregator { + + public static ExponentialHistogramStates.SingleState initSingle(DriverContext driverContext) { + return new ExponentialHistogramStates.SingleState(driverContext.breaker()); + } + + public static void combine(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { + state.add(value, true); + } + + public static void combineIntermediate(ExponentialHistogramStates.SingleState state, ExponentialHistogram value) { + state.add(value, false); + } + + public static Block evaluateFinal(ExponentialHistogramStates.SingleState state, DriverContext driverContext) { + return state.evaluateFinal(driverContext); + } + + public static ExponentialHistogramStates.GroupingState initGrouping(BigArrays bigArrays, DriverContext driverContext) { + return new ExponentialHistogramStates.GroupingState(bigArrays, driverContext.breaker()); + } + + public static void combine(ExponentialHistogramStates.GroupingState current, int groupId, ExponentialHistogram value) { + current.add(groupId, value, true); + } + + public static void combineIntermediate(ExponentialHistogramStates.GroupingState state, int groupId, ExponentialHistogram value) { + state.add(groupId, value, false); + } + + public static Block evaluateFinal( + ExponentialHistogramStates.GroupingState state, + IntVector selected, + GroupingAggregatorEvaluationContext ctx + ) { + return state.evaluateFinal(selected, ctx.driverContext()); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java index f8aa1a708a3f9..7eb0ce01bf59c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java @@ -103,8 +103,6 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) { // We should add a dedicated encoding when building a block from computed histograms which do not originate from doc values // That encoding should be optimized for speed and support storing the zero threshold as (scale, index) pair ZeroBucket zeroBucket = histogram.zeroBucket(); - assert zeroBucket.compareZeroThreshold(ZeroBucket.minimalEmpty()) == 0 || zeroBucket.isIndexBased() == false - : "Current encoding only supports double-based zero thresholds"; BytesStreamOutput encodedBytes = new BytesStreamOutput(); try { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 53a89c642a919..9f0a0be60d5f7 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -29,8 +29,10 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; import org.hamcrest.Matcher; import java.util.ArrayList; @@ -38,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.DoubleStream; import java.util.stream.IntStream; import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; @@ -380,6 +383,7 @@ public static Page convertBytesRefsToOrdinals(Page page) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils + int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); @@ -390,13 +394,23 @@ public static ExponentialHistogram randomExponentialHistogram() { ), hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - - int numBuckets = randomIntBetween(4, 300); ReleasableExponentialHistogram histo = ExponentialHistogram.create( numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues ); + // Setup a proper zeroThreshold based on a random chance + if (histo.zeroBucket().count() > 0 && randomBoolean()) { + double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); + double zeroThreshold = smallestNonZeroValue * randomDouble(); + try (ReleasableExponentialHistogram releaseAfterCopy = histo) { + ZeroBucket zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(zeroBucket); + histo = builder.build(); + } + } + // Make the result histogram writeable to allow usage in Literals for testing return histo; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 8741fbdb079a9..c17455515920c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -45,8 +45,10 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.utils.Geohash; @@ -167,6 +169,7 @@ import java.util.TreeMap; import java.util.function.Predicate; import java.util.jar.JarInputStream; +import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.zip.ZipEntry; @@ -1056,6 +1059,7 @@ public static Literal randomLiteral(DataType type) { public static ExponentialHistogram randomExponentialHistogram() { // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils + int numBuckets = randomIntBetween(4, 300); boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); @@ -1067,12 +1071,22 @@ public static ExponentialHistogram randomExponentialHistogram() { hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - int numBuckets = randomIntBetween(4, 300); ReleasableExponentialHistogram histo = ExponentialHistogram.create( numBuckets, ExponentialHistogramCircuitBreaker.noop(), rawValues ); + // Setup a proper zeroThreshold based on a random chance + if (histo.zeroBucket().count() > 0 && randomBoolean()) { + double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); + double zeroThreshold = smallestNonZeroValue * randomDouble(); + try (ReleasableExponentialHistogram releaseAfterCopy = histo) { + ZeroBucket zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(zeroBucket); + histo = builder.build(); + } + } // Make the result histogram writeable to allow usage in Literals for testing return new WriteableExponentialHistogram(histo); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java index bbcdb989e7f8a..65264e8456c38 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java @@ -41,7 +41,7 @@ public class WriteableExponentialHistogram extends AbstractExponentialHistogram private final ExponentialHistogram delegate; - WriteableExponentialHistogram(ExponentialHistogram delegate) { + public WriteableExponentialHistogram(ExponentialHistogram delegate) { this.delegate = delegate; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec index 475e568230279..53e0158f945b4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/exponential_histogram.csv-spec @@ -13,3 +13,32 @@ dummy-positive_only | "{""scale"":2,""sum"":1275.0,""min"":1.0,""max"":50. dummy-zero_count_only | "{""scale"":2,""sum"":0.0,""min"":0.0,""max"":0.0,""zero"":{""count"":101}}" dummy-zero_threshold_only | "{""scale"":0,""sum"":0.0,""zero"":{""threshold"":2.0E-5}}" ; + +ungroupedPercentiles +required_capability: exponential_histogram_percentiles_support + +FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") + | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) + | EVAL p50 = ROUND(p50, 7), p99 = ROUND(p99, 7) // rounding to avoid floating point precision issues, min and max are exact so no rounding needed + | KEEP p0, p50, p99, p100 +; + +p0:double | p50:double | p99:double | p100:double +2.17E-4 | 0.0016965 | 0.9472324 | 6.786232 +; + +groupedPercentiles +required_capability: exponential_histogram_percentiles_support + +FROM exp_histo_sample | WHERE NOT STARTS_WITH(instance, "dummy") + | STATS p0 = PERCENTILE(responseTime,0), p50 = PERCENTILE(responseTime,50), p99 = PERCENTILE(responseTime, 99), p100 = PERCENTILE(responseTime,100) BY instance + | EVAL p50 = ROUND(p50, 7), p99 = ROUND(p99, 7) // rounding to avoid floating point precision issues, min and max are exact so no rounding needed + | KEEP instance, p0, p50, p99, p100 + | SORT instance +; + +instance:keyword | p0:double | p50:double | p99:double | p100:double +instance-0 | 2.4E-4 | 0.0211404 | 1.0432946 | 6.786232 +instance-1 | 2.17E-4 | 6.469E-4 | 0.1422151 | 3.190723 +instance-2 | 2.2E-4 | 6.469E-4 | 0.0857672 | 2.7059714542564097 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9bd0979676fd1..68081ea2872e1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1642,6 +1642,11 @@ public enum Cap { FULL_TEXT_FUNCTIONS_ACCEPT_NULL_FIELD, + /** + * Support for exponential_histogram type in PERCENTILES aggregation. + */ + EXPONENTIAL_HISTOGRAM_PERCENTILES_SUPPORT(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG), + // Last capability should still have a comma for fewer merge conflicts when adding new ones :) // This comment prevents the semicolon from being on the previous capability when Spotless formats the file. ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 5b3bddd89d093..4ca37d3c10137 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -51,7 +51,8 @@ public static List getNamedWriteables() { PresentOverTime.ENTRY, Absent.ENTRY, AbsentOverTime.ENTRY, - DimensionValues.ENTRY + DimensionValues.ENTRY, + HistogramMerge.ENTRY ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java new file mode 100644 index 0000000000000..daf1c06144f74 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMerge.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.HistogramMergeExponentialHistogramAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +/** + * Merges multiple histograms into a single histogram. + * Note that this function is currently only intended for usage in surrogates and not available as a user-facing function. + * Therefore, it is intentionally not registered in {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}. + */ +public class HistogramMerge extends AggregateFunction implements ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "Merge", + HistogramMerge::new + ); + + @FunctionInfo(returnType = "exponential_histogram", type = FunctionType.AGGREGATE) + public HistogramMerge(Source source, @Param(name = "histogram", type = { "exponential_histogram" }) Expression field) { + this(source, field, Literal.TRUE); + } + + public HistogramMerge(Source source, Expression field, Expression filter) { + super(source, field, filter, NO_WINDOW, emptyList()); + } + + private HistogramMerge(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public DataType dataType() { + return DataType.EXPONENTIAL_HISTOGRAM; + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> dt == DataType.EXPONENTIAL_HISTOGRAM, sourceText(), DEFAULT, "exponential_histogram"); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, HistogramMerge::new, field(), filter()); + } + + @Override + public HistogramMerge replaceChildren(List newChildren) { + return new HistogramMerge(source(), newChildren.get(0), newChildren.get(1)); + } + + public HistogramMerge withFilter(Expression filter) { + return new HistogramMerge(source(), field(), filter); + } + + @Override + public final AggregatorFunctionSupplier supplier() { + DataType type = field().dataType(); + if (type == DataType.EXPONENTIAL_HISTOGRAM) { + return new HistogramMergeExponentialHistogramAggregatorFunctionSupplier(); + } + throw EsqlIllegalArgumentException.illegalDataType(type); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java index 23b55c429c009..ff13b29df75f5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionType; import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble; +import org.elasticsearch.xpack.esql.expression.function.scalar.histogram.HistogramPercentile; import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvPercentile; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; @@ -74,7 +75,7 @@ public class Percentile extends NumericAggregate implements SurrogateExpression ) public Percentile( Source source, - @Param(name = "number", type = { "double", "integer", "long" }) Expression field, + @Param(name = "number", type = { "double", "integer", "long", "exponential_histogram" }) Expression field, @Param(name = "percentile", type = { "double", "integer", "long" }) Expression percentile ) { this(source, field, Literal.TRUE, NO_WINDOW, percentile); @@ -127,10 +128,10 @@ protected TypeResolution resolveType() { TypeResolution resolution = isType( field(), - dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, + dt -> (dt.isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.EXPONENTIAL_HISTOGRAM, sourceText(), FIRST, - "numeric except unsigned_long" + "exponential_histogram or numeric except unsigned_long" ); if (resolution.unresolved()) { return resolution; @@ -168,6 +169,9 @@ private double percentileValue() { public Expression surrogate() { var field = field(); + if (field.dataType() == DataType.EXPONENTIAL_HISTOGRAM) { + return new HistogramPercentile(source(), new HistogramMerge(source(), field, filter()), percentile()); + } if (field.foldable()) { return new MvPercentile(source(), new ToDouble(source(), field), percentile()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index 8d4e5ab0346dd..3981b71f316b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -112,7 +112,8 @@ private static DataType toDataType(ElementType elementType) { case LONG -> DataType.LONG; case DOUBLE -> DataType.DOUBLE; case DOC -> DataType.DOC_DATA_TYPE; - case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new EsqlIllegalArgumentException( + case EXPONENTIAL_HISTOGRAM -> DataType.EXPONENTIAL_HISTOGRAM; + case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new EsqlIllegalArgumentException( "unsupported agg type: " + elementType ); }; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 76eba5fd0eb2e..ee5a2a653b67d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2051,7 +2051,7 @@ public void testUnsupportedTypesInStats() { found value [x] type [unsigned_long] line 2:58: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long] - line 2:96: first argument of [percentile(x, 10)] must be [numeric except unsigned_long],\ + line 2:96: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long],\ found value [x] type [unsigned_long] line 2:115: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [unsigned_long]"""); @@ -2067,7 +2067,8 @@ public void testUnsupportedTypesInStats() { found value [x] type [version] line 2:29: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\ found value [x] type [version] - line 2:59: first argument of [percentile(x, 10)] must be [numeric except unsigned_long], found value [x] type [version] + line 2:59: first argument of [percentile(x, 10)] must be [exponential_histogram or numeric except unsigned_long],\ + found value [x] type [version] line 2:78: argument of [sum(x)] must be [aggregate_metric_double or numeric except unsigned_long or counter types],\ found value [x] type [version]"""); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java index 4d62c04528c55..50b845c8ed328 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAggregationTestCase.java @@ -561,6 +561,7 @@ protected static String standardAggregatorName(String prefix, DataType type) { case IP -> "Ip"; case DATETIME, DATE_NANOS, LONG, COUNTER_LONG, UNSIGNED_LONG, GEOHASH, GEOTILE, GEOHEX -> "Long"; case AGGREGATE_METRIC_DOUBLE -> "AggregateMetricDouble"; + case EXPONENTIAL_HISTOGRAM -> "ExponentialHistogram"; case NULL -> "Null"; default -> throw new UnsupportedOperationException("name for [" + type + "]"); }; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java index 93f650398a605..879f43d2f90ea 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.Geometry; @@ -21,6 +22,8 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.WriteableExponentialHistogram; +import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.versionfield.Version; @@ -491,6 +494,29 @@ public static List geohexCases(int minRows, int maxRows) { return cases; } + public static List exponentialHistogramCases(int minRows, int maxRows) { + List cases = new ArrayList<>(); + if (EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG.isEnabled()) { + addSuppliers( + cases, + minRows, + maxRows, + "empty exponential histograms", + DataType.EXPONENTIAL_HISTOGRAM, + () -> new WriteableExponentialHistogram(ExponentialHistogram.empty()) + ); + addSuppliers( + cases, + minRows, + maxRows, + "random exponential histograms", + DataType.EXPONENTIAL_HISTOGRAM, + EsqlTestUtils::randomExponentialHistogram + ); + } + return cases; + } + public static List stringCases(int minRows, int maxRows, DataType type) { List cases = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java new file mode 100644 index 0000000000000..f7867ffd651cd --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeErrorTests.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.ErrorsForCasesWithoutExamplesTestCase; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; +import org.hamcrest.Matcher; +import org.junit.Before; + +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class HistogramMergeErrorTests extends ErrorsForCasesWithoutExamplesTestCase { + + @Before + public void setup() { + assumeTrue( + "Only when esql_exponential_histogram feature flag is enabled", + EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG.isEnabled() + ); + } + + @Override + protected List cases() { + return paramsToSuppliers(HistogramMergeTests.parameters()); + } + + @Override + protected Expression build(Source source, List args) { + return new HistogramMerge(source, args.get(0)); + } + + @Override + protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) { + return equalTo(typeErrorMessage(false, validPerPosition, signature, (v, p) -> "exponential_histogram")); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java new file mode 100644 index 0000000000000..ca6c58ba6da93 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeSerializationTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; + +import java.io.IOException; + +public class HistogramMergeSerializationTests extends AbstractExpressionSerializationTests { + @Override + protected HistogramMerge createTestInstance() { + return new HistogramMerge(randomSource(), randomChild()); + } + + @Override + protected HistogramMerge mutateInstance(HistogramMerge instance) throws IOException { + return new HistogramMerge( + instance.source(), + randomValueOtherThan(instance.field(), AbstractExpressionSerializationTests::randomChild) + ); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java new file mode 100644 index 0000000000000..a778cd57bed71 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/HistogramMergeTests.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; +import org.elasticsearch.exponentialhistogram.ZeroBucket; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; +import org.elasticsearch.xpack.esql.expression.function.MultiRowTestCaseSupplier; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.compute.aggregation.ExponentialHistogramStates.MAX_BUCKET_COUNT; +import static org.hamcrest.Matchers.equalTo; + +public class HistogramMergeTests extends AbstractAggregationTestCase { + public HistogramMergeTests(@Name("TestCase") Supplier testCaseSupplier) { + this.testCase = testCaseSupplier.get(); + } + + @ParametersFactory + public static Iterable parameters() { + var suppliers = new ArrayList(); + + Stream.of(MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100)) + .flatMap(List::stream) + .map(HistogramMergeTests::makeSupplier) + .collect(Collectors.toCollection(() -> suppliers)); + + return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers, true); + } + + @Override + protected Expression build(Source source, List args) { + return new HistogramMerge(source, args.get(0)); + } + + private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSupplier) { + return new TestCaseSupplier(List.of(fieldSupplier.type()), () -> { + var fieldTypedData = fieldSupplier.get(); + var fieldValues = fieldTypedData.multiRowData(); + + ExponentialHistogramMerger merger = ExponentialHistogramMerger.create( + MAX_BUCKET_COUNT, + ExponentialHistogramCircuitBreaker.noop() + ); + + boolean anyValuesNonNull = false; + + for (var fieldValue : fieldValues) { + ExponentialHistogram histogram = (ExponentialHistogram) fieldValue; + if (histogram != null) { + anyValuesNonNull = true; + merger.add(histogram); + } + } + + var expected = anyValuesNonNull ? merger.get() : null; + return new TestCaseSupplier.TestCase( + List.of(fieldTypedData), + standardAggregatorName("HistogramMerge", fieldSupplier.type()), + DataType.EXPONENTIAL_HISTOGRAM, + equalToWithLenientZeroBucket(expected) + ); + }); + } + + private static Matcher equalToWithLenientZeroBucket(ExponentialHistogram expected) { + // if there is no zero-threshold involved, merging is deterministic + if (expected.zeroBucket().zeroThreshold() == 0) { + return equalTo(expected); + } + + // if there is a zero-threshold involed, things get a little more hairy + // the exact merge result depends on the order in which downscales happen vs when the highest zero threshold is seen + // this means the zero-bucket can be different to the expected result and the scale can slightly differ + // we fix this by adjusting both histograms to the same scale + + return new BaseMatcher() { + @Override + public boolean matches(Object actualObj) { + if (actualObj instanceof ExponentialHistogram == false) { + return false; + } + ExponentialHistogram actual = (ExponentialHistogram) actualObj; + + // step one: bring both histogram to the same scale + int targetScale = Math.min(actual.scale(), expected.scale()); + ExponentialHistogram a = downscaleTo(actual, targetScale); + ExponentialHistogram b = downscaleTo(expected, targetScale); + + // step two: bring the zero-threshold of both histograms to the same value (the higher one) + ZeroBucket targetZeroBucket; + if (a.zeroBucket().compareZeroThreshold(b.zeroBucket()) >= 0) { + targetZeroBucket = a.zeroBucket(); + } else { + targetZeroBucket = b.zeroBucket(); + } + a = increaseZeroThreshold(a, targetZeroBucket); + b = increaseZeroThreshold(b, targetZeroBucket); + // now they should actually be equal! + return a.equals(b); + } + + @Override + public void describeTo(Description description) { + description.appendValue(expected); + } + }; + } + + private static ExponentialHistogram downscaleTo(ExponentialHistogram histogram, int targetScale) { + assert histogram.scale() >= targetScale; + ExponentialHistogramMerger merger = ExponentialHistogramMerger.createWithMaxScale( + MAX_BUCKET_COUNT, + targetScale, + ExponentialHistogramCircuitBreaker.noop() + ); + merger.addWithoutUpscaling(histogram); + return merger.get(); + } + + private static ExponentialHistogram increaseZeroThreshold(ExponentialHistogram histo, ZeroBucket targetZeroBucket) { + ExponentialHistogramMerger merger = ExponentialHistogramMerger.create(MAX_BUCKET_COUNT, ExponentialHistogramCircuitBreaker.noop()); + merger.addWithoutUpscaling(histo); + // now add a histogram with only the zero-threshold with a count of 1 to trigger merging of overlapping buckets + merger.add( + ExponentialHistogram.builder(ExponentialHistogram.MAX_SCALE, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(copyWithNewCount(targetZeroBucket, 1)) + .build() + ); + // the merger now has the desired zero-threshold, but we need to subtract the fake zero count again + ExponentialHistogram mergeResult = merger.get(); + return ExponentialHistogram.builder(mergeResult, ExponentialHistogramCircuitBreaker.noop()) + .zeroBucket(copyWithNewCount(mergeResult.zeroBucket(), mergeResult.zeroBucket().count() - 1)) + .build(); + } + + private static ZeroBucket copyWithNewCount(ZeroBucket zb, long newCount) { + if (zb.isIndexBased()) { + return ZeroBucket.create(zb.index(), zb.scale(), newCount); + } else { + return ZeroBucket.create(zb.zeroThreshold(), newCount); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java index b2f701f41792b..4c0440de3d8af 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileErrorTests.java @@ -32,6 +32,10 @@ protected Expression build(Source source, List args) { @Override protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) { - return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> "numeric except unsigned_long")); + return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> switch (p) { + case 0 -> "exponential_histogram or numeric except unsigned_long"; + case 1 -> "numeric except unsigned_long"; + default -> throw new IllegalStateException("Unexpected argument index " + p); + })); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java index bd8f6e96cff32..e0a2ab40879dc 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java @@ -11,6 +11,10 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Types; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -21,9 +25,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Stream; +import static org.elasticsearch.compute.aggregation.ExponentialHistogramStates.MAX_BUCKET_COUNT; import static org.hamcrest.Matchers.equalTo; public class PercentileTests extends AbstractAggregationTestCase { @@ -38,7 +44,8 @@ public static Iterable parameters() { var fieldCases = Stream.of( MultiRowTestCaseSupplier.intCases(1, 1000, Integer.MIN_VALUE, Integer.MAX_VALUE, true), MultiRowTestCaseSupplier.longCases(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE, true), - MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true) + MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true), + MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100) ).flatMap(List::stream).toList(); var percentileCases = Stream.of( @@ -71,20 +78,42 @@ private static TestCaseSupplier makeSupplier( var percentile = ((Number) percentileTypedData.data()).doubleValue(); - try (var digest = TDigestState.create(newLimitedBreaker(ByteSizeValue.ofMb(100)), 1000)) { - for (var value : fieldTypedData.multiRowData()) { - digest.add(((Number) value).doubleValue()); - } + Double expected; + if (fieldTypedData.type() == DataType.EXPONENTIAL_HISTOGRAM) { + // Note that the merging used underneath can be dependent on the order if zero-buckets are involved + // therefore the percentile in theory could vary slightly + // however, it seems that the order is the same in the tests vs the reference computation + // if we ever encounter flakes here, we should replace the equalTo() assertion with an assertion on the relative error + expected = getExpectedPercentileForExponentialHistograms(Types.forciblyCast(fieldTypedData.multiRowData()), percentile); + } else { + expected = getExpectedPercentileForNumbers(Types.forciblyCast(fieldTypedData.multiRowData()), percentile); + } - var expected = digest.size() == 0 ? null : digest.quantile(percentile / 100); + return new TestCaseSupplier.TestCase( + List.of(fieldTypedData, percentileTypedData), + standardAggregatorName("Percentile", fieldSupplier.type()), + DataType.DOUBLE, + equalTo(expected) + ); + }); + } - return new TestCaseSupplier.TestCase( - List.of(fieldTypedData, percentileTypedData), - standardAggregatorName("Percentile", fieldSupplier.type()), - DataType.DOUBLE, - equalTo(expected) - ); + private static Double getExpectedPercentileForNumbers(List values, double percentile) { + try (var digest = TDigestState.create(newLimitedBreaker(ByteSizeValue.ofMb(100)), 1000)) { + for (var value : values) { + digest.add(value.doubleValue()); } - }); + return digest.size() == 0 ? null : digest.quantile(percentile / 100); + } + } + + private static Double getExpectedPercentileForExponentialHistograms(List values, double percentile) { + ExponentialHistogram merged = ExponentialHistogram.merge( + MAX_BUCKET_COUNT, + ExponentialHistogramCircuitBreaker.noop(), + values.stream().filter(Objects::nonNull).toList().iterator() + ); + double result = ExponentialHistogramQuantile.getQuantile(merged, percentile / 100.0); + return Double.isNaN(result) ? null : result; } }