diff --git a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb index eacf0b3dc4..1940d44772 100644 --- a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb +++ b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb @@ -52,12 +52,13 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR ssl_verify_mode: MetricsExporter.ssl_verify_mode, headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}), compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'), - timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10)) + timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10), + aggregation_cardinality_limit: nil) raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint) raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression) # create the MetricStore object - super() + super(aggregation_cardinality_limit: aggregation_cardinality_limit) @uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT'] URI.join(endpoint, 'v1/metrics') diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb index 65e351b328..580e723958 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb @@ -14,7 +14,7 @@ def collect(start_time, end_time, data_points) data_points.values.map!(&:dup) end - def update(increment, attributes, data_points) + def update(increment, attributes, data_points, cardinality_limit) data_points[attributes] = NumberDataPoint.new( {}, 0, diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb index 768036fc86..de9fdf7f2a 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb @@ -11,6 +11,7 @@ module Aggregation # Contains the implementation of the ExplicitBucketHistogram aggregation # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation class ExplicitBucketHistogram + OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze private_constant :DEFAULT_BOUNDARIES @@ -50,27 +51,46 @@ def collect(start_time, end_time, data_points) end end - def update(amount, attributes, data_points) - hdp = data_points.fetch(attributes) do - if @record_min_max - min = Float::INFINITY - max = -Float::INFINITY - end + def update(amount, attributes, data_points, cardinality_limit) + hdp = if data_points.key?(attributes) + data_points[attributes] + elsif data_points.size >= cardinality_limit + data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points) + else + create_new_data_point(attributes, data_points) + end + + update_histogram_data_point(hdp, amount) + nil + end + + def aggregation_temporality + @aggregation_temporality.temporality + end + + private - data_points[attributes] = HistogramDataPoint.new( - attributes, - nil, # :start_time_unix_nano - nil, # :time_unix_nano - 0, # :count - 0, # :sum - empty_bucket_counts, # :bucket_counts - @boundaries, # :explicit_bounds - nil, # :exemplars - min, # :min - max # :max - ) + def create_new_data_point(attributes, data_points) + if @record_min_max + min = Float::INFINITY + max = -Float::INFINITY end + data_points[attributes] = HistogramDataPoint.new( + attributes, + nil, # :start_time_unix_nano + nil, # :time_unix_nano + 0, # :count + 0, # :sum + empty_bucket_counts, # :bucket_counts + @boundaries, # :explicit_bounds + nil, # :exemplars + min, # :min + max # :max + ) + end + + def update_histogram_data_point(hdp, amount) if @record_min_max hdp.max = amount if amount > hdp.max hdp.min = amount if amount < hdp.min @@ -78,19 +98,12 @@ def update(amount, attributes, data_points) hdp.sum += amount hdp.count += 1 - if @boundaries - bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size - hdp.bucket_counts[bucket_index] += 1 - end - nil - end + return unless @boundaries - def aggregation_temporality - @aggregation_temporality.temporality + bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size + hdp.bucket_counts[bucket_index] += 1 end - private - def empty_bucket_counts @boundaries ? Array.new(@boundaries.size + 1, 0) : nil end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb index e7b37b4d49..18702484a5 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb @@ -16,6 +16,8 @@ module Metrics module Aggregation # Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength + OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze + # relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale DEFAULT_SIZE = 160 DEFAULT_SCALE = 20 @@ -69,33 +71,50 @@ def collect(start_time, end_time, data_points) end end - # rubocop:disable Metrics/MethodLength - def update(amount, attributes, data_points) - # fetch or initialize the ExponentialHistogramDataPoint - hdp = data_points.fetch(attributes) do - if @record_min_max - min = Float::INFINITY - max = -Float::INFINITY - end + def update(amount, attributes, data_points, cardinality_limit) + hdp = if data_points.key?(attributes) + data_points[attributes] + elsif data_points.size >= cardinality_limit + data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points) + else + create_new_data_point(attributes, data_points) + end + + update_histogram_data_point(hdp, amount) + nil + end + + def aggregation_temporality + @aggregation_temporality.temporality + end + + private - data_points[attributes] = ExponentialHistogramDataPoint.new( - attributes, - nil, # :start_time_unix_nano - 0, # :time_unix_nano - 0, # :count - 0, # :sum - @scale, # :scale - @zero_count, # :zero_count - ExponentialHistogram::Buckets.new, # :positive - ExponentialHistogram::Buckets.new, # :negative - 0, # :flags - nil, # :exemplars - min, # :min - max, # :max - @zero_threshold # :zero_threshold) - ) + def create_new_data_point(attributes, data_points) + if @record_min_max + min = Float::INFINITY + max = -Float::INFINITY end + data_points[attributes] = ExponentialHistogramDataPoint.new( + attributes, + nil, # :start_time_unix_nano + 0, # :time_unix_nano + 0, # :count + 0, # :sum + @scale, # :scale + @zero_count, # :zero_count + ExponentialHistogram::Buckets.new, # :positive + ExponentialHistogram::Buckets.new, # :negative + 0, # :flags + nil, # :exemplars + min, # :min + max, # :max + @zero_threshold # :zero_threshold) + ) + end + + def update_histogram_data_point(hdp, amount) # Start to populate the data point (esp. the buckets) if @record_min_max hdp.max = amount if amount > hdp.max @@ -163,16 +182,8 @@ def update(amount, attributes, data_points) bucket_index += buckets.counts.size if bucket_index.negative? buckets.increment_bucket(bucket_index) - nil - end - # rubocop:enable Metrics/MethodLength - - def aggregation_temporality - @aggregation_temporality.temporality end - private - def grow_buckets(span, buckets) return if span < buckets.counts.size @@ -190,9 +201,6 @@ def empty_counts end def get_scale_change(low, high) - # puts "get_scale_change: low: #{low}, high: #{high}, @size: #{@size}" - # python code also produce 18 with 0,1048575, the high is little bit off - # just checked, the mapping is also ok, produce the 1048575 change = 0 while high - low >= @size high >>= 1 diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb index 8fb05912e2..72b67a6ddf 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb @@ -10,6 +10,8 @@ module Metrics module Aggregation # Contains the implementation of the LastValue aggregation class LastValue + OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze + def collect(start_time, end_time, data_points) ndps = data_points.values.map! do |ndp| ndp.start_time_unix_nano = start_time @@ -20,15 +22,34 @@ def collect(start_time, end_time, data_points) ndps end - def update(increment, attributes, data_points) + def update(increment, attributes, data_points, cardinality_limit) + # Check if we already have this attribute set + ndp = if data_points.key?(attributes) + data_points[attributes] + elsif data_points.size >= cardinality_limit + data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points) + else + create_new_data_point(attributes, data_points) + end + + update_number_data_point(ndp, increment) + nil + end + + private + + def create_new_data_point(attributes, data_points) data_points[attributes] = NumberDataPoint.new( attributes, nil, nil, - increment, + 0, nil ) - nil + end + + def update_number_data_point(ndp, increment) + ndp.value = increment end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb index fc1e65a4f8..4757748bbc 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb @@ -9,8 +9,9 @@ module SDK module Metrics module Aggregation # Contains the implementation of the Sum aggregation - # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#sum-aggregation class Sum + OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze + def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :cumulative), monotonic: false, instrument_kind: nil) @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, instrument_kind: instrument_kind, default: :cumulative) @monotonic = monotonic @@ -36,27 +37,44 @@ def collect(start_time, end_time, data_points) end end + def update(increment, attributes, data_points, cardinality_limit) + return if @monotonic && increment < 0 + + # Check if we already have this attribute set + ndp = if data_points.key?(attributes) + data_points[attributes] + elsif data_points.size >= cardinality_limit + data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points) + else + create_new_data_point(attributes, data_points) + end + + update_number_data_point(ndp, increment) + nil + end + def monotonic? @monotonic end - def update(increment, attributes, data_points) - return if @monotonic && increment < 0 + def aggregation_temporality + @aggregation_temporality.temporality + end + + private - ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new( + def create_new_data_point(attributes, data_points) + data_points[attributes] = NumberDataPoint.new( attributes, nil, nil, 0, nil ) - - ndp.value += increment - nil end - def aggregation_temporality - @aggregation_temporality.temporality + def update_number_data_point(ndp, increment) + ndp.value += increment end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb index 8264c8e2e0..4adc440ea3 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb @@ -12,8 +12,8 @@ module Export # # Potentially useful for exploratory purposes. class ConsoleMetricPullExporter < MetricReader - def initialize - super + def initialize(aggregation_cardinality_limit: nil) + super(aggregation_cardinality_limit: aggregation_cardinality_limit) @stopped = false end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb index d0d8ccb902..9701c904c9 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb @@ -13,8 +13,8 @@ module Export class InMemoryMetricPullExporter < MetricReader attr_reader :metric_snapshots - def initialize - super + def initialize(aggregation_cardinality_limit: nil) + super(aggregation_cardinality_limit: aggregation_cardinality_limit) @metric_snapshots = [] @mutex = Mutex.new end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb index c261f4f10a..af44898eed 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb @@ -14,8 +14,8 @@ module Export class MetricReader attr_reader :metric_store - def initialize - @metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new + def initialize(aggregation_cardinality_limit: nil) + @metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new(cardinality_limit: aggregation_cardinality_limit) end def collect diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 3f55d6fbd6..3951ce63ef 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -24,8 +24,9 @@ class PeriodicMetricReader < MetricReader # @return a new instance of the {PeriodicMetricReader}. def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)), export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)), - exporter: nil) - super() + exporter: nil, + aggregation_cardinality_limit: nil) + super(aggregation_cardinality_limit: aggregation_cardinality_limit) @export_interval = export_interval_millis / 1000.0 @export_timeout = export_timeout_millis / 1000.0 diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb index fce7575421..89f2838aeb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb @@ -49,14 +49,18 @@ def collect(start_time, end_time) def invoke_callback(timeout, attributes) if @registered_views.empty? + + resolved_cardinality_limit = @cardinality_limit || DEFAULT_CARDINALITY_LIMIT @mutex.synchronize do @callback.each do |cb| value = safe_guard_callback(cb, timeout: timeout) - @default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric) + @default_aggregation.update(value, attributes, @data_points, resolved_cardinality_limit) if value.is_a?(Numeric) end end else @registered_views.each do |view, data_points| + resolved_cardinality_limit = resolve_cardinality_limit(view) + @mutex.synchronize do @callback.each do |cb| value = safe_guard_callback(cb, timeout: timeout) @@ -64,7 +68,7 @@ def invoke_callback(timeout, attributes) merged_attributes = attributes || {} merged_attributes.merge!(view.attribute_keys) - view.aggregation.update(value, merged_attributes, data_points) if view.valid_aggregation? + view.aggregation.update(value, merged_attributes, data_points, resolved_cardinality_limit) if view.valid_aggregation? end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb index 41285d9a79..457c92b390 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb @@ -13,11 +13,12 @@ module State # The MetricStore module provides SDK internal functionality that is not a part of the # public API. class MetricStore - def initialize + def initialize(cardinality_limit: nil) @mutex = Mutex.new @epoch_start_time = OpenTelemetry::Common::Utilities.time_in_nanoseconds @epoch_end_time = nil @metric_streams = [] + @cardinality_limit = cardinality_limit end def collect @@ -32,6 +33,7 @@ def collect def add_metric_stream(metric_stream) @mutex.synchronize do + metric_stream.cardinality_limit = @cardinality_limit @metric_streams = @metric_streams.dup.push(metric_stream) nil end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb index 667b65ff29..cb2cb942b0 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb @@ -12,8 +12,13 @@ module State # # The MetricStream class provides SDK internal functionality that is not a part of the # public API. + # + # rubocop:disable Metrics/ClassLength class MetricStream attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points + attr_writer :cardinality_limit + + DEFAULT_CARDINALITY_LIMIT = 2000 def initialize( name, @@ -38,6 +43,10 @@ def initialize( @mutex = Mutex.new end + # this cardinality_limit is from exporter.new(cardinality_limit: cardinality_limit) + # -> metric_reader.collect(...cardinality_limit) + # -> metric_store.collect(...cardinality_limit) + # -> metric_stream.collect(...cardinality_limit) def collect(start_time, end_time) @mutex.synchronize do metric_data = [] @@ -46,10 +55,14 @@ def collect(start_time, end_time) return metric_data if empty_data_point? if @registered_views.empty? - metric_data << aggregate_metric_data(start_time, end_time) + metric_data << aggregate_metric_data(start_time, + end_time) else @registered_views.each do |view, data_points| - metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation, data_points: data_points) + metric_data << aggregate_metric_data(start_time, + end_time, + aggregation: view.aggregation, + data_points: data_points) end end @@ -57,16 +70,24 @@ def collect(start_time, end_time) end end + # view has the cardinality, pass to aggregation update + # to determine if aggregation have the cardinality + # if the aggregation does not have the cardinality, then it will be default 2000 + # it better to move overflowed data_points during update because if do it in collect, + # then we need to sort the entire data_points (~ 2000) based on time, which is time-consuming # view will modify the data_point that is not suitable when there are multiple views def update(value, attributes) if @registered_views.empty? - @mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) } + resolved_cardinality_limit = resolve_cardinality_limit(nil) + + @mutex.synchronize { @default_aggregation.update(value, attributes, @data_points, resolved_cardinality_limit) } else @registered_views.each do |view, data_points| + resolved_cardinality_limit = resolve_cardinality_limit(view) @mutex.synchronize do attributes ||= {} attributes.merge!(view.attribute_keys) - view.aggregation.update(value, attributes, data_points) if view.valid_aggregation? + view.aggregation.update(value, attributes, data_points, resolved_cardinality_limit) if view.valid_aggregation? end end end @@ -109,6 +130,11 @@ def empty_data_point? end end + def resolve_cardinality_limit(view) + cardinality_limit = view&.aggregation_cardinality_limit || @cardinality_limit || DEFAULT_CARDINALITY_LIMIT + [cardinality_limit, 0].max # if cardinality_limit is negative, then give it 0 + end + def to_s instrument_info = +'' instrument_info << "name=#{@name}" @@ -123,6 +149,7 @@ def to_s end.join("\n") end end + # rubocop:enable Metrics/ClassLength end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb index 881f0f261e..5a4e3c971f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb @@ -10,13 +10,14 @@ module Metrics module View # RegisteredView is an internal class used to match Views with a given {MetricStream} class RegisteredView - attr_reader :name, :aggregation, :attribute_keys, :regex + attr_reader :name, :aggregation, :attribute_keys, :regex, :aggregation_cardinality_limit def initialize(name, **options) @name = name @options = options @aggregation = options[:aggregation] @attribute_keys = options[:attribute_keys] || {} + @aggregation_cardinality_limit = options[:aggregation_cardinality_limit] generate_regex_pattern(name) end diff --git a/metrics_sdk/test/integration/console_metric_pull_exporter.rb b/metrics_sdk/test/integration/console_metric_pull_exporter_test.rb similarity index 69% rename from metrics_sdk/test/integration/console_metric_pull_exporter.rb rename to metrics_sdk/test/integration/console_metric_pull_exporter_test.rb index 57f8fb1e90..8bf7040429 100644 --- a/metrics_sdk/test/integration/console_metric_pull_exporter.rb +++ b/metrics_sdk/test/integration/console_metric_pull_exporter_test.rb @@ -78,5 +78,31 @@ _(exporter.export(metrics)).must_equal export::FAILURE end + + describe 'cardinality limit' do + it 'accepts cardinality_limit parameter on initialization' do + exporter_with_limit = export::ConsoleMetricPullExporter.new(aggregation_cardinality_limit: 100) + _(exporter_with_limit.export(metrics)).must_equal export::SUCCESS + end + + it 'enforces cardinality limit when collecting metrics' do + exporter_with_limit = export::ConsoleMetricPullExporter.new(aggregation_cardinality_limit: 2) + + OpenTelemetry::SDK.configure + OpenTelemetry.meter_provider.add_metric_reader(exporter_with_limit) + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('test_counter') + + # Add more data points than the cardinality limit + counter.add(1, attributes: { 'key' => 'a' }) + counter.add(2, attributes: { 'key' => 'b' }) + counter.add(3, attributes: { 'key' => 'c' }) # Should trigger overflow + + exporter_with_limit.pull + + # Check that overflow attribute is present in output + _(captured_stdout.string).must_match(/otel\.metric\.overflow.*true/) + end + end end end diff --git a/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb b/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb index 9f3aee62b0..e9f2a849bf 100644 --- a/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb +++ b/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb @@ -49,5 +49,71 @@ _(last_snapshot[0].aggregation_temporality).must_equal(:cumulative) end + + describe 'cardinality limit' do + before do + OpenTelemetry::SDK.configure + end + it 'accepts cardinality_limit parameter on initialization' do + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new(aggregation_cardinality_limit: 100) + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('test_counter') + counter.add(1) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot).wont_be_empty + end + + it 'enforces cardinality limit when collecting metrics' do + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new(aggregation_cardinality_limit: 2) + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('test_counter') + + # Add more data points than the cardinality limit + counter.add(1, attributes: { 'key' => 'a' }) + counter.add(2, attributes: { 'key' => 'b' }) + counter.add(3, attributes: { 'key' => 'c' }) # Should trigger overflow + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points.size).must_equal(3) + + # Find overflow data point + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.value).must_equal(3) + end + + it 'handles zero cardinality limit' do + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new(aggregation_cardinality_limit: 0) + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('test_counter') + + counter.add(42, attributes: { 'key' => 'value' }) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points.size).must_equal(1) + + # All should go to overflow + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.value).must_equal(42) + end + end end end diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 5e85cefd3d..6cdb6fc76f 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -199,5 +199,62 @@ _(periodic_metric_reader.alive?).must_equal false end + + describe 'cardinality limit' do + it 'accepts cardinality_limit parameter on initialization' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new( + export_interval_millis: 100, + export_timeout_millis: 1000, + exporter: metric_exporter, + aggregation_cardinality_limit: 50 + ) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + _(periodic_metric_reader.alive?).must_equal true + periodic_metric_reader.shutdown + end + + it 'passes cardinality limit to metric collection' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new( + export_interval_millis: 100, + export_timeout_millis: 1000, + exporter: metric_exporter, + aggregation_cardinality_limit: 2 + ) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('test_counter') + + # Add more data points than the cardinality limit + counter.add(1, attributes: { 'key' => 'a' }) + counter.add(2, attributes: { 'key' => 'b' }) + counter.add(3, attributes: { 'key' => 'c' }) # Should trigger overflow + counter.add(4, attributes: { 'key' => 'e' }) + + # Wait for collection + sleep(0.2) + + periodic_metric_reader.shutdown + snapshot = metric_exporter.metric_snapshots + + _(snapshot).wont_be_empty + _(snapshot[0].data_points.size).must_equal(3) # Limited by cardinality + + # Find overflow data point + overflow_point = snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + end + end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb index b2212b462a..c129169b67 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb @@ -10,7 +10,7 @@ let(:data_points) { {} } let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new } let(:aggregation_temporality) { :delta } - + let(:cardinality_limit) { 2000 } # Time in nano let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } @@ -20,18 +20,18 @@ end it 'sets the timestamps' do - drop_aggregation.update(0, {}, data_points) + drop_aggregation.update(0, {}, data_points, cardinality_limit) ndp = drop_aggregation.collect(start_time, end_time, data_points)[0] _(ndp.start_time_unix_nano).must_equal(0) _(ndp.time_unix_nano).must_equal(0) end it 'aggregates and collects should collect no value for all collection' do - drop_aggregation.update(1, {}, data_points) - drop_aggregation.update(2, {}, data_points) + drop_aggregation.update(1, {}, data_points, cardinality_limit) + drop_aggregation.update(2, {}, data_points, cardinality_limit) - drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) - drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) + drop_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) + drop_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) ndps = drop_aggregation.collect(start_time, end_time, data_points) @@ -42,4 +42,21 @@ _(ndps[1].value).must_equal(0) _(ndps[1].attributes).must_equal({}) end + + describe 'cardinality limit' do + it 'respects cardinality limit but still drops all values' do + cardinality_limit = 2 + drop_aggregation.update(10, { 'key' => 'a' }, data_points, cardinality_limit) + drop_aggregation.update(20, { 'key' => 'b' }, data_points, cardinality_limit) + drop_aggregation.update(30, { 'key' => 'c' }, data_points, cardinality_limit) # Should be limited + drop_aggregation.update(40, { 'key' => 'd' }, data_points, cardinality_limit) # Should be limited + + ndps = drop_aggregation.collect(start_time, end_time, data_points) + + # All values should be 0 regardless of cardinality limit + ndps.each do |ndp| + _(ndp.value).must_equal(0) + end + end + end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb index 8dc7c2eabe..44b0644b0b 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb @@ -21,6 +21,7 @@ # Time in nano let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } + let(:cardinality_limit) { 2000 } describe '#initialize' do it 'defaults to the delta aggregation temporality' do @@ -91,17 +92,17 @@ describe '#collect' do it 'returns all the data points' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) - - ebh.update(-10, { 'foo' => 'bar' }, data_points) - ebh.update(1, { 'foo' => 'bar' }, data_points) - ebh.update(22, { 'foo' => 'bar' }, data_points) - ebh.update(55, { 'foo' => 'bar' }, data_points) - ebh.update(80, { 'foo' => 'bar' }, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) + + ebh.update(-10, { 'foo' => 'bar' }, data_points, cardinality_limit) + ebh.update(1, { 'foo' => 'bar' }, data_points, cardinality_limit) + ebh.update(22, { 'foo' => 'bar' }, data_points, cardinality_limit) + ebh.update(55, { 'foo' => 'bar' }, data_points, cardinality_limit) + ebh.update(80, { 'foo' => 'bar' }, data_points, cardinality_limit) hdps = ebh.collect(start_time, end_time, data_points) _(hdps.size).must_equal(2) @@ -123,34 +124,34 @@ end it 'sets the timestamps' do - ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.start_time_unix_nano).must_equal(start_time) _(hdp.time_unix_nano).must_equal(end_time) end it 'calculates the count' do - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(0, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.count).must_equal(4) end it 'does not aggregate between collects with default delta aggregation' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) hdps = ebh.collect(start_time, end_time, data_points) - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -173,18 +174,18 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) hdps = ebh.collect(start_time, end_time, data_points) - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -216,36 +217,36 @@ describe '#update' do it 'accumulates across the default boundaries' do - ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) - ebh.update(1, {}, data_points) - ebh.update(5, {}, data_points) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) - ebh.update(6, {}, data_points) - ebh.update(10, {}, data_points) + ebh.update(6, {}, data_points, cardinality_limit) + ebh.update(10, {}, data_points, cardinality_limit) - ebh.update(11, {}, data_points) - ebh.update(25, {}, data_points) + ebh.update(11, {}, data_points, cardinality_limit) + ebh.update(25, {}, data_points, cardinality_limit) - ebh.update(26, {}, data_points) - ebh.update(50, {}, data_points) + ebh.update(26, {}, data_points, cardinality_limit) + ebh.update(50, {}, data_points, cardinality_limit) - ebh.update(51, {}, data_points) - ebh.update(75, {}, data_points) + ebh.update(51, {}, data_points, cardinality_limit) + ebh.update(75, {}, data_points, cardinality_limit) - ebh.update(76, {}, data_points) - ebh.update(100, {}, data_points) + ebh.update(76, {}, data_points, cardinality_limit) + ebh.update(100, {}, data_points, cardinality_limit) - ebh.update(101, {}, data_points) - ebh.update(250, {}, data_points) + ebh.update(101, {}, data_points, cardinality_limit) + ebh.update(250, {}, data_points, cardinality_limit) - ebh.update(251, {}, data_points) - ebh.update(500, {}, data_points) + ebh.update(251, {}, data_points, cardinality_limit) + ebh.update(500, {}, data_points, cardinality_limit) - ebh.update(501, {}, data_points) - ebh.update(1000, {}, data_points) + ebh.update(501, {}, data_points, cardinality_limit) + ebh.update(1000, {}, data_points, cardinality_limit) - ebh.update(1001, {}, data_points) + ebh.update(1001, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1]) @@ -258,7 +259,7 @@ let(:boundaries) { [4, 2, 1] } it 'sorts it' do - ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points, cardinality_limit) _(ebh.collect(start_time, end_time, data_points)[0].explicit_bounds).must_equal([1, 2, 4]) end end @@ -267,7 +268,7 @@ let(:record_min_max) { false } it 'does not record min max values' do - ebh.update(-1, {}, data_points) + ebh.update(-1, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.min).must_be_nil _(hdp.min).must_be_nil @@ -278,13 +279,13 @@ let(:boundaries) { [0, 2, 4] } it 'aggregates' do - ebh.update(-1, {}, data_points) - ebh.update(0, {}, data_points) - ebh.update(1, {}, data_points) - ebh.update(2, {}, data_points) - ebh.update(3, {}, data_points) - ebh.update(4, {}, data_points) - ebh.update(5, {}, data_points) + ebh.update(-1, {}, data_points, cardinality_limit) + ebh.update(0, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) + ebh.update(2, {}, data_points, cardinality_limit) + ebh.update(3, {}, data_points, cardinality_limit) + ebh.update(4, {}, data_points, cardinality_limit) + ebh.update(5, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([2, 2, 2, 1]) @@ -295,8 +296,8 @@ let(:boundaries) { [0] } it 'aggregates' do - ebh.update(-1, {}, data_points) - ebh.update(1, {}, data_points) + ebh.update(-1, {}, data_points, cardinality_limit) + ebh.update(1, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([1, 1]) @@ -307,8 +308,8 @@ let(:boundaries) { [] } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}, data_points) - ebh.update(3, {}, data_points) + ebh.update(-1, {}, data_points, cardinality_limit) + ebh.update(3, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_be_nil @@ -324,8 +325,8 @@ let(:boundaries) { nil } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}, data_points) - ebh.update(3, {}, data_points) + ebh.update(-1, {}, data_points, cardinality_limit) + ebh.update(3, {}, data_points, cardinality_limit) hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_be_nil @@ -337,4 +338,75 @@ end end end + + describe 'cardinality limit' do + it 'handles overflow scenarios and merges measurements correctly' do + cardinality_limit = 2 + # Test basic overflow behavior and multiple overflow merging in one flow + ebh.update(1, { 'key' => 'a' }, data_points, cardinality_limit) + ebh.update(5, { 'key' => 'b' }, data_points, cardinality_limit) + ebh.update(10, { 'key' => 'c' }, data_points, cardinality_limit) # This should overflow + ebh.update(15, { 'key' => 'd' }, data_points, cardinality_limit) # Also overflow + + hdps = ebh.collect(start_time, end_time, data_points) + + _(hdps.size).must_equal(3) # 2 regular + 1 overflow + + overflow_point = hdps.find { |hdp| hdp.attributes == { 'otel.metric.overflow' => true } } + _(overflow_point).wont_be_nil + _(overflow_point.count).must_equal(2) # Both overflow measurements merged + _(overflow_point.sum).must_equal(25) # 10 + 15 + end + + describe 'edge cases' do + it 'handles cardinality limit 0' do + # Test cardinality limit of 0 - everything overflows + cardinality_limit = 0 + ebh.update(5, { 'key' => 'value' }, data_points, cardinality_limit) + + hdps = ebh.collect(start_time, end_time, data_points) + + _(hdps.size).must_equal(1) + overflow_point = hdps.find { |hdp| hdp.attributes == { 'otel.metric.overflow' => true } } + _(overflow_point).wont_be_nil + _(overflow_point.count).must_equal(1) + _(overflow_point.sum).must_equal(5) + _(overflow_point.min).must_equal(5) + _(overflow_point.max).must_equal(5) + end + + it 'handles cardinality limit 1' do + # Test cardinality limit of 1 with bucket counts preservation + cardinality_limit = 1 + ebh.update(1, { 'key' => 'a' }, data_points, cardinality_limit) + ebh.update(10, { 'key' => 'b' }, data_points, cardinality_limit) # Overflow + ebh.update(100, { 'key' => 'c' }, data_points, cardinality_limit) # More overflow + + hdps = ebh.collect(start_time, end_time, data_points) + overflow_point = hdps.find { |hdp| hdp.attributes == { 'otel.metric.overflow' => true } } + + # Check bucket counts are properly merged + _(overflow_point.bucket_counts).wont_be_nil + _(overflow_point.bucket_counts.sum).must_equal(overflow_point.count) + _(overflow_point.min).must_equal(10) + _(overflow_point.max).must_equal(100) + end + + it 'handles very large cardinality scenarios' do + cardinality_limit = 100 + + # Add 150 unique attribute sets + 150.times do |i| + ebh.update(i, { 'unique_key' => "value_#{i}" }, data_points, cardinality_limit) + end + + hdps = ebh.collect(start_time, end_time, data_points) + + _(hdps.size).must_equal(101) # 100 + 1 + overflow_point = hdps.find { |hdp| hdp.attributes == { 'otel.metric.overflow' => true } } + _(overflow_point).wont_be_nil + _(overflow_point.count).must_equal(50) # 150 - 100 + end + end + end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb index 1b7b249926..cc322434ea 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb @@ -25,15 +25,16 @@ # Time in nano let(:start_time) { Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) } let(:end_time) { Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) + (60 * 1_000_000_000) } + let(:cardinality_limit) { 2000 } describe '#collect' do it 'returns all the data points' do - expbh.update(1.03, {}, data_points) - expbh.update(1.23, {}, data_points) - expbh.update(0, {}, data_points) + expbh.update(1.03, {}, data_points, cardinality_limit) + expbh.update(1.23, {}, data_points, cardinality_limit) + expbh.update(0, {}, data_points, cardinality_limit) - expbh.update(1.45, { 'foo' => 'bar' }, data_points) - expbh.update(1.67, { 'foo' => 'bar' }, data_points) + expbh.update(1.45, { 'foo' => 'bar' }, data_points, cardinality_limit) + expbh.update(1.67, { 'foo' => 'bar' }, data_points, cardinality_limit) exphdps = expbh.collect(start_time, end_time, data_points) @@ -80,9 +81,9 @@ zero_threshold: 0 ) - expbh.update(2, {}, data_points) - expbh.update(4, {}, data_points) - expbh.update(1, {}, data_points) + expbh.update(2, {}, data_points, cardinality_limit) + expbh.update(4, {}, data_points, cardinality_limit) + expbh.update(1, {}, data_points, cardinality_limit) exphdps = expbh.collect(start_time, end_time, data_points) @@ -113,12 +114,12 @@ zero_threshold: 0 ) - expbh.update(2, {}, data_points) - expbh.update(2, {}, data_points) - expbh.update(2, {}, data_points) - expbh.update(1, {}, data_points) - expbh.update(8, {}, data_points) - expbh.update(0.5, {}, data_points) + expbh.update(2, {}, data_points, cardinality_limit) + expbh.update(2, {}, data_points, cardinality_limit) + expbh.update(2, {}, data_points, cardinality_limit) + expbh.update(1, {}, data_points, cardinality_limit) + expbh.update(8, {}, data_points, cardinality_limit) + expbh.update(0.5, {}, data_points, cardinality_limit) exphdps = expbh.collect(start_time, end_time, data_points) @@ -143,8 +144,8 @@ zero_threshold: 0 ) - expbh.update(0, {}, data_points) - expbh.update(10_000, {}, data_points) + expbh.update(0, {}, data_points, cardinality_limit) + expbh.update(10_000, {}, data_points, cardinality_limit) exphdps = expbh.collect(start_time, end_time, data_points) @@ -203,7 +204,7 @@ ) permutation.each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}, data_points, cardinality_limit) end exphdps = expbh.collect(start_time, end_time, data_points) @@ -226,9 +227,9 @@ zero_threshold: 0 ) - expbh.update(Float::MAX, {}, data_points) - expbh.update(1, {}, data_points) - expbh.update(2**-1074, {}, data_points) + expbh.update(Float::MAX, {}, data_points, cardinality_limit) + expbh.update(1, {}, data_points, cardinality_limit) + expbh.update(2**-1074, {}, data_points, cardinality_limit) exphdps = expbh.collect(start_time, end_time, data_points) @@ -250,7 +251,7 @@ ) [1, 3, 5, 7, 9].each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}, data_points, cardinality_limit) end exphdps = expbh.collect(start_time, end_time, data_points) @@ -265,7 +266,7 @@ ) [-1, -3, -5, -7, -9].each do |value| - expbh.update(value, {}, data_points) + expbh.update(value, {}, data_points, cardinality_limit) end exphdps = expbh.collect(start_time, end_time, data_points) @@ -291,15 +292,41 @@ end it 'test_invalid_size_validation' do - error = assert_raises(ArgumentError) do - OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 10_000_000) + assert_raises ArgumentError do + OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 1) end - assert_equal('Max size 10000000 is larger than maximum size 16384', error.message) - error = assert_raises(ArgumentError) do - OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 0) + assert_raises ArgumentError do + OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 16_385) + end + end + + describe 'cardinality limit' do + it 'handles overflow data points and merges overflow measurements correctly' do + cardinality_limit = 2 + + expbh.update(1.5, { 'key' => 'a' }, data_points, cardinality_limit) + expbh.update(2.5, { 'key' => 'b' }, data_points, cardinality_limit) + expbh.update(3.5, { 'key' => 'c' }, data_points, cardinality_limit) # This should start overflow + expbh.update(4.5, { 'key' => 'd' }, data_points, cardinality_limit) + expbh.update(0, { 'key' => 'e' }, data_points, cardinality_limit) + + exphdps = expbh.collect(start_time, end_time, data_points) + + _(exphdps.size).must_equal(3) + + overflow_point = exphdps.find { |hdp| hdp.attributes == { 'otel.metric.overflow' => true } } + + _(overflow_point).wont_be_nil + _(overflow_point.count).must_equal(3) + _(overflow_point.sum).must_equal(8) + _(overflow_point.scale).must_equal(5) + _(overflow_point.min).must_equal(0) + _(overflow_point.max).must_equal(4.5) + _(overflow_point.positive.counts).must_equal([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]) + _(overflow_point.negative.counts).must_equal([0]) + _(overflow_point.zero_count).must_equal(1) end - assert_equal('Max size 0 is smaller than minimum size 2', error.message) end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb index 6e606d7462..59d07f67e3 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb @@ -9,24 +9,24 @@ describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do let(:data_points) { {} } let(:last_value_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new } - + let(:cardinality_limit) { 2000 } # Time in nano let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } it 'sets the timestamps' do - last_value_aggregation.update(0, {}, data_points) + last_value_aggregation.update(0, {}, data_points, cardinality_limit) ndp = last_value_aggregation.collect(start_time, end_time, data_points)[0] _(ndp.start_time_unix_nano).must_equal(start_time) _(ndp.time_unix_nano).must_equal(end_time) end it 'aggregates and collects should collect the last value' do - last_value_aggregation.update(1, {}, data_points) - last_value_aggregation.update(2, {}, data_points) + last_value_aggregation.update(1, {}, data_points, cardinality_limit) + last_value_aggregation.update(2, {}, data_points, cardinality_limit) - last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) - last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) + last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) + last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) ndps = last_value_aggregation.collect(start_time, end_time, data_points) _(ndps[0].value).must_equal(2) @@ -35,4 +35,52 @@ _(ndps[1].value).must_equal(2) _(ndps[1].attributes).must_equal('foo' => 'bar') end + + describe 'cardinality limit' do + let(:cardinality_limit) { 2 } + + it 'creates overflow data point when cardinality limit is exceeded' do + last_value_aggregation.update(10, { 'key' => 'a' }, data_points, cardinality_limit) + last_value_aggregation.update(20, { 'key' => 'a' }, data_points, cardinality_limit) + last_value_aggregation.update(20, { 'key' => 'b' }, data_points, cardinality_limit) + last_value_aggregation.update(30, { 'key' => 'c' }, data_points, cardinality_limit) # This should overflow + last_value_aggregation.update(40, { 'key' => 'e' }, data_points, cardinality_limit) # This should overflow + + ndps = last_value_aggregation.collect(start_time, end_time, data_points) + + _(ndps.size).must_equal(3) + + assert_equal(ndps[0].attributes, { 'key' => 'a' }) + _(ndps[0].value).must_equal 20 + + assert_equal(ndps[1].attributes, { 'key' => 'b' }) + _(ndps[1].value).must_equal 20 + + overflow_point = ndps.find { |ndp| ndp.attributes == { 'otel.metric.overflow' => true } } + _(overflow_point.value).must_equal(40) + end + + it 'updates existing attribute sets without triggering overflow' do + last_value_aggregation.update(10, { 'key' => 'a' }, data_points, cardinality_limit) + last_value_aggregation.update(20, { 'key' => 'b' }, data_points, cardinality_limit) + last_value_aggregation.update(15, { 'key' => 'a' }, data_points, cardinality_limit) # Update existing + + _(data_points.size).must_equal(2) + _(data_points[{ 'key' => 'a' }].value).must_equal(15) + _(data_points[{ 'key' => 'b' }].value).must_equal(20) + end + + describe 'edge cases' do + it 'handles cardinality limits 0' do + # Test cardinality limit of 0 - everything overflows + cardinality_limit = 0 + last_value_aggregation.update(42, { 'key' => 'value' }, data_points, cardinality_limit) + last_value_aggregation.update(30, { 'key' => 'value' }, data_points, cardinality_limit) + + _(data_points.size).must_equal(1) + overflow_point = data_points[{ 'otel.metric.overflow' => true }] + _(overflow_point.value).must_equal(30) + end + end + end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb index 870f7933bb..ee5c3ff0a0 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb @@ -11,6 +11,7 @@ let(:sum_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality:, monotonic:) } let(:aggregation_temporality) { :delta } let(:monotonic) { false } + let(:cardinality_limit) { 2000 } # Time in nano let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } @@ -84,18 +85,18 @@ end it 'sets the timestamps' do - sum_aggregation.update(0, {}, data_points) + sum_aggregation.update(0, {}, data_points, cardinality_limit) ndp = sum_aggregation.collect(start_time, end_time, data_points)[0] _(ndp.start_time_unix_nano).must_equal(start_time) _(ndp.time_unix_nano).must_equal(end_time) end it 'aggregates and collects' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) + sum_aggregation.update(2, {}, data_points, cardinality_limit) - sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) - sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) + sum_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) + sum_aggregation.update(2, { 'foo' => 'bar' }, data_points, cardinality_limit) ndps = sum_aggregation.collect(start_time, end_time, data_points) _(ndps[0].value).must_equal(3) @@ -106,19 +107,19 @@ end it 'aggregates and collects negative values' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(-2, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) + sum_aggregation.update(-2, {}, data_points, cardinality_limit) ndps = sum_aggregation.collect(start_time, end_time, data_points) _(ndps[0].value).must_equal(-1) end it 'does not aggregate between collects' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) + sum_aggregation.update(2, {}, data_points, cardinality_limit) ndps = sum_aggregation.collect(start_time, end_time, data_points) - sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) @@ -133,11 +134,11 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(2, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) + sum_aggregation.update(2, {}, data_points, cardinality_limit) ndps = sum_aggregation.collect(start_time, end_time, data_points) - sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) @@ -155,11 +156,45 @@ let(:monotonic) { true } it 'does not allow negative values to accumulate' do - sum_aggregation.update(1, {}, data_points) - sum_aggregation.update(-2, {}, data_points) + sum_aggregation.update(1, {}, data_points, cardinality_limit) + sum_aggregation.update(-2, {}, data_points, cardinality_limit) ndps = sum_aggregation.collect(start_time, end_time, data_points) _(ndps[0].value).must_equal(1) end end + + describe 'cardinality limit' do + describe 'with cumulative aggregation' do + it 'preserves pre-overflow attributes after overflow starts' do + cardinality_limit = 3 + sum_aggregation.update(1, { 'key' => 'a' }, data_points, cardinality_limit) + sum_aggregation.update(2, { 'key' => 'b' }, data_points, cardinality_limit) + sum_aggregation.update(3, { 'key' => 'c' }, data_points, cardinality_limit) + sum_aggregation.update(4, { 'key' => 'd' }, data_points, cardinality_limit) # This should overflow + sum_aggregation.update(5, { 'key' => 'e' }, data_points, cardinality_limit) + sum_aggregation.update(5, { 'key' => 'a' }, data_points, cardinality_limit) + + _(data_points.size).must_equal(4) # 3 original + 1 overflow + _(data_points[{ 'key' => 'a' }].value).must_equal(6) # 1 + 5 + _(data_points[{ 'key' => 'b' }].value).must_equal(2) # 1 + 5 + _(data_points[{ 'otel.metric.overflow' => true }].value).must_equal(9) + _(data_points[{ 'key' => 'd' }]).must_be_nil + _(data_points[{ 'key' => 'e' }]).must_be_nil + end + end + + describe 'edge cases' do + it 'handles cardinality limits 0' do + # Test cardinality limit of 0 - everything overflows + cardinality_limit = 0 + sum_aggregation.update(10, { 'key' => 'a' }, data_points, cardinality_limit) + sum_aggregation.update(12, { 'key' => 'd' }, data_points, cardinality_limit) + + _(data_points.size).must_equal(1) + overflow_point = data_points[{ 'otel.metric.overflow' => true }] + _(overflow_point.value).must_equal(22) + end + end + end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb index f9d9fa2257..0eedcbf5c3 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb @@ -292,5 +292,141 @@ _(registered_view.name_match('!@#$%^&')).must_equal true end end + + describe 'cardinality limit in views' do + before { reset_metrics_sdk } + + it 'applies cardinality limit from view configuration' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + + # Create a view with cardinality limit + OpenTelemetry.meter_provider.add_view( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new, + aggregation_cardinality_limit: 2 + ) + + counter = meter.create_counter('test_counter') + + # Add more data points than the view's cardinality limit + counter.add(1, attributes: { 'key' => 'a' }) + counter.add(2, attributes: { 'key' => 'b' }) + counter.add(3, attributes: { 'key' => 'c' }) # Should trigger overflow + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot).wont_be_empty + _(last_snapshot[0].data_points.size).must_equal(3) + + # Find overflow data point + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.value).must_equal(3) + end + + it 'view cardinality limit overrides global limit' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new(aggregation_cardinality_limit: 10) + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + + # View with stricter cardinality limit than global + OpenTelemetry.meter_provider.add_view( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new, + aggregation_cardinality_limit: 1 + ) + + counter = meter.create_counter('test_counter') + + counter.add(1, attributes: { 'key' => 'a' }) + counter.add(2, attributes: { 'key' => 'b' }) # Should trigger overflow immediately + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points.size).must_equal(2) # Limited by view's cardinality limit + + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.value).must_equal(2) + end + + it 'handles zero cardinality limit in view' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + + OpenTelemetry.meter_provider.add_view( + 'test_counter', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::Sum.new, + aggregation_cardinality_limit: 0 + ) + + counter = meter.create_counter('test_counter') + + counter.add(42, attributes: { 'key' => 'value' }) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points.size).must_equal(1) + + # All should go to overflow + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.value).must_equal(42) + end + + it 'works with histogram aggregation and cardinality limit' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + + OpenTelemetry.meter_provider.add_view( + 'test_histogram', + aggregation: OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new, + aggregation_cardinality_limit: 1 + ) + + histogram = meter.create_histogram('test_histogram') + + histogram.record(5, attributes: { 'key' => 'a' }) + histogram.record(15, attributes: { 'key' => 'b' }) # Should trigger overflow + histogram.record(20, attributes: { 'key' => 'c' }) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points.size).must_equal(2) + + overflow_point = last_snapshot[0].data_points.find do |dp| + dp.attributes == { 'otel.metric.overflow' => true } + end + _(overflow_point).wont_be_nil + _(overflow_point.count).must_equal(2) + _(overflow_point.sum).must_equal(35) + end + end end end