Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR
ssl_verify_mode: MetricsExporter.ssl_verify_mode,
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10))
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10),
aggregation_cardinality_limit: nil)
raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint)
raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression)

# create the MetricStore object
super()
super(aggregation_cardinality_limit: aggregation_cardinality_limit)

@uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
URI.join(endpoint, 'v1/metrics')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
end

def update(increment, attributes, data_points)
def update(increment, attributes, data_points, cardinality_limit)
data_points[attributes] = NumberDataPoint.new(
{},
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Aggregation
# Contains the implementation of the ExplicitBucketHistogram aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
class ExplicitBucketHistogram
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
private_constant :DEFAULT_BOUNDARIES

Expand Down Expand Up @@ -50,47 +51,59 @@ def collect(start_time, end_time, data_points)
end
end

def update(amount, attributes, data_points)
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_histogram_data_point(hdp, amount)
nil
end

def aggregation_temporality
@aggregation_temporality.temporality
end

private

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
end

def update_histogram_data_point(hdp, amount)
if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
end

hdp.sum += amount
hdp.count += 1
if @boundaries
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end
nil
end
return unless @boundaries

def aggregation_temporality
@aggregation_temporality.temporality
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end

private

def empty_bucket_counts
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module Metrics
module Aggregation
# Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze

# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
DEFAULT_SIZE = 160
DEFAULT_SCALE = 20
Expand Down Expand Up @@ -69,33 +71,50 @@ def collect(start_time, end_time, data_points)
end
end

# rubocop:disable Metrics/MethodLength
def update(amount, attributes, data_points)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_histogram_data_point(hdp, amount)
nil
end

def aggregation_temporality
@aggregation_temporality.temporality
end

private

data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold)
)
def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold)
)
end

def update_histogram_data_point(hdp, amount)
# Start to populate the data point (esp. the buckets)
if @record_min_max
hdp.max = amount if amount > hdp.max
Expand Down Expand Up @@ -163,16 +182,8 @@ def update(amount, attributes, data_points)
bucket_index += buckets.counts.size if bucket_index.negative?

buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength

def aggregation_temporality
@aggregation_temporality.temporality
end

private

def grow_buckets(span, buckets)
return if span < buckets.counts.size

Expand All @@ -190,9 +201,6 @@ def empty_counts
end

def get_scale_change(low, high)
# puts "get_scale_change: low: #{low}, high: #{high}, @size: #{@size}"
# python code also produce 18 with 0,1048575, the high is little bit off
# just checked, the mapping is also ok, produce the 1048575
change = 0
while high - low >= @size
high >>= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze

def collect(start_time, end_time, data_points)
ndps = data_points.values.map! do |ndp|
ndp.start_time_unix_nano = start_time
Expand All @@ -20,15 +22,34 @@ def collect(start_time, end_time, data_points)
ndps
end

def update(increment, attributes, data_points)
def update(increment, attributes, data_points, cardinality_limit)
# Check if we already have this attribute set
ndp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_number_data_point(ndp, increment)
nil
end

private

def create_new_data_point(attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
increment,
0,
nil
)
nil
end

def update_number_data_point(ndp, increment)
ndp.value = increment
end
end
end
Expand Down
36 changes: 27 additions & 9 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ module SDK
module Metrics
module Aggregation
# Contains the implementation of the Sum aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#sum-aggregation
class Sum
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze

def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :cumulative), monotonic: false, instrument_kind: nil)
@aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, instrument_kind: instrument_kind, default: :cumulative)
@monotonic = monotonic
Expand All @@ -36,27 +37,44 @@ def collect(start_time, end_time, data_points)
end
end

def update(increment, attributes, data_points, cardinality_limit)
return if @monotonic && increment < 0

# Check if we already have this attribute set
ndp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_number_data_point(ndp, increment)
nil
end

def monotonic?
@monotonic
end

def update(increment, attributes, data_points)
return if @monotonic && increment < 0
def aggregation_temporality
@aggregation_temporality.temporality
end

private

ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new(
def create_new_data_point(attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
0,
nil
)

ndp.value += increment
nil
end

def aggregation_temporality
@aggregation_temporality.temporality
def update_number_data_point(ndp, increment)
ndp.value += increment
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ module Export
#
# Potentially useful for exploratory purposes.
class ConsoleMetricPullExporter < MetricReader
def initialize
super
def initialize(aggregation_cardinality_limit: nil)
super(aggregation_cardinality_limit: aggregation_cardinality_limit)
@stopped = false
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ module Export
class InMemoryMetricPullExporter < MetricReader
attr_reader :metric_snapshots

def initialize
super
def initialize(aggregation_cardinality_limit: nil)
super(aggregation_cardinality_limit: aggregation_cardinality_limit)
@metric_snapshots = []
@mutex = Mutex.new
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ module Export
class MetricReader
attr_reader :metric_store

def initialize
@metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new
def initialize(aggregation_cardinality_limit: nil)
@metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new(cardinality_limit: aggregation_cardinality_limit)
end

def collect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class PeriodicMetricReader < MetricReader
# @return a new instance of the {PeriodicMetricReader}.
def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)),
export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)),
exporter: nil)
super()
exporter: nil,
aggregation_cardinality_limit: nil)
super(aggregation_cardinality_limit: aggregation_cardinality_limit)

@export_interval = export_interval_millis / 1000.0
@export_timeout = export_timeout_millis / 1000.0
Expand Down
Loading