Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 92 additions & 58 deletions exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@



# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
Expand All @@ -8,6 +11,7 @@
require 'opentelemetry/sdk'
require 'net/http'
require 'zlib'
require 'json'

require 'google/rpc/status_pb'

Expand Down Expand Up @@ -53,6 +57,8 @@ def initialize(endpoint: nil,
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10),
# content_type may be 'application/x-protobuf' (default) or 'application/json'
content_type: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_CONTENT_TYPE', 'OTEL_EXPORTER_OTLP_CONTENT_TYPE', default: 'application/x-protobuf'),
metrics_reporter: nil)
@uri = prepare_endpoint(endpoint)

Expand All @@ -62,6 +68,7 @@ def initialize(endpoint: nil,

@path = @uri.path
@headers = prepare_headers(headers)
@content_type = content_type
@timeout = timeout.to_f
@compression = compression
@metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter
Expand Down Expand Up @@ -103,26 +110,6 @@ def shutdown(timeout: nil)

private

# Builds span flags based on whether the parent span context is remote.
# This follows the OTLP specification for span flags.
def build_span_flags(parent_span_is_remote, base_flags)
# Extract integer value from TraceFlags object if needed
# Derive the low 8-bit W3C trace flags using the public API.
base_flags_int =
if base_flags.sampled?
1
else
0
end

has_remote_mask = Opentelemetry::Proto::Trace::V1::SpanFlags::SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK
is_remote_mask = Opentelemetry::Proto::Trace::V1::SpanFlags::SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK

flags = base_flags_int | has_remote_mask
flags |= is_remote_mask if parent_span_is_remote
flags
end

def http_connection(uri, ssl_verify_mode, certificate_file, client_certificate_file, client_key_file)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
Expand Down Expand Up @@ -159,7 +146,7 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity,
body = bytes
end
request.body = body
request.add_field('Content-Type', 'application/x-protobuf')
request.add_field('Content-Type', @content_type)
@headers.each { |key, value| request.add_field(key, value) }

retry_count = 0
Expand Down Expand Up @@ -192,7 +179,7 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity,
log_request_failure(response.code)
FAILURE
when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError
log_status(response.body)
log_status(response.body, response['Content-Type'])
@metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code })
FAILURE
when Net::HTTPRedirection
Expand Down Expand Up @@ -239,15 +226,53 @@ def handle_redirect(location)
# TODO: figure out destination and reinitialize @http and @path
end

def log_status(body)
status = Google::Rpc::Status.decode(body)
details = status.details.map do |detail|
klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass
detail.unpack(klass_or_nil) if klass_or_nil
end.compact
OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}} for uri=#{@uri}")
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status')
# Attempts to decode rpc.Status from the response body. The exporter
# previously logged a full exception/backtrace when decoding failed for
# non-protobuf bodies returned by some collectors or proxies. To reduce
# noisy error logs in production we:
# - try protobuf decoding first
# - fall back to JSON parsing if protobuf parsing fails
# - if both fail, record a concise message (no exception backtrace)
def log_status(body, content_type = nil)
return if body.nil? || body.empty?

# Prefer protobuf decoding when content type suggests protobuf or
# when the body looks binary. Attempt decode and only treat parsing
# failures as non-fatal expected conditions.
begin
status = Google::Rpc::Status.decode(body)
details = status.details.map do |detail|
klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass
detail.unpack(klass_or_nil) if klass_or_nil
end.compact
OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}} for uri=#{@uri}")
return
rescue ::Google::Protobuf::ParseError
# fall through to try JSON or plain-text handling
rescue StandardError => e
# For any unexpected errors in decode, log a concise message but
# avoid surfacing the full backtrace for common parse failures.
OpenTelemetry.handle_error(message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status - #{e.class}: #{e.message}")
return
end

# If protobuf decode failed, try to interpret the body as JSON.
begin
parsed = JSON.parse(body)
if parsed.is_a?(Hash) && parsed['message']
OpenTelemetry.handle_error(message: "OTLP exporter received status message=#{parsed['message']} details=#{parsed['details'] || []} for uri=#{@uri}")
return
end
rescue JSON::ParserError
# fall through to final fallback
rescue StandardError => e
OpenTelemetry.handle_error(message: "unexpected error parsing rpc.Status body as JSON - #{e.class}: #{e.message}")
return
end

# Last resort: log a concise message showing the (possibly non-protobuf)
# body. Keep it short to avoid noisy backtraces in production.
OpenTelemetry.handle_error(message: "OTLP exporter received an unrecognized error body for uri=#{@uri}: #{body.inspect}")
end

def log_request_failure(response_code)
Expand Down Expand Up @@ -296,30 +321,41 @@ def backoff?(retry_count:, reason:, retry_after: nil) # rubocop:disable Metrics/

def encode(span_data) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode(
Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.new(
resource_spans: span_data
.group_by(&:resource)
.map do |resource, span_datas|
Opentelemetry::Proto::Trace::V1::ResourceSpans.new(
resource: Opentelemetry::Proto::Resource::V1::Resource.new(
attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) }
),
scope_spans: span_datas
.group_by(&:instrumentation_scope)
.map do |il, sds|
Opentelemetry::Proto::Trace::V1::ScopeSpans.new(
scope: Opentelemetry::Proto::Common::V1::InstrumentationScope.new(
name: il.name,
version: il.version
),
spans: sds.map { |sd| as_otlp_span(sd) }
)
end
)
end
)
request_message = Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.new(
resource_spans: span_data
.group_by(&:resource)
.map do |resource, span_datas|
Opentelemetry::Proto::Trace::V1::ResourceSpans.new(
resource: Opentelemetry::Proto::Resource::V1::Resource.new(
attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) }
),
scope_spans: span_datas
.group_by(&:instrumentation_scope)
.map do |il, sds|
Opentelemetry::Proto::Trace::V1::ScopeSpans.new(
scope: Opentelemetry::Proto::Common::V1::InstrumentationScope.new(
name: il.name,
version: il.version
),
spans: sds.map { |sd| as_otlp_span(sd) }
)
end
)
end
)

if @content_type == 'application/json'
# Try to use protobuf message JSON serialization; fall back to protobuf bytes on error.
begin
# google-protobuf messages generally respond to #to_json
return request_message.to_json
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'failed to serialize OTLP request to JSON; falling back to protobuf')
return Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode(request_message)
end
end

Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode(request_message)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error in OTLP::Exporter#encode')
nil
Expand Down Expand Up @@ -356,9 +392,8 @@ def as_otlp_span(span_data) # rubocop:disable Metrics/MethodLength, Metrics/Cycl
trace_id: link.span_context.trace_id,
span_id: link.span_context.span_id,
trace_state: link.span_context.tracestate.to_s,
attributes: link.attributes&.map { |k, v| as_otlp_key_value(k, v) },
attributes: link.attributes&.map { |k, v| as_otlp_key_value(k, v) }
# TODO: track dropped_attributes_count in Span#trim_links
flags: build_span_flags(link.span_context.remote?, link.span_context.trace_flags)
)
end,
dropped_links_count: span_data.total_recorded_links - span_data.links&.size.to_i,
Expand All @@ -367,8 +402,7 @@ def as_otlp_span(span_data) # rubocop:disable Metrics/MethodLength, Metrics/Cycl
code: as_otlp_status_code(status.code),
message: status.description
)
end,
flags: build_span_flags(span_data.parent_span_is_remote, span_data.trace_flags)
end
)
end

Expand Down