diff --git a/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb b/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb index 603c95142..2e91fd9e7 100644 --- a/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb +++ b/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb @@ -1,3 +1,6 @@ + + + # frozen_string_literal: true # Copyright The OpenTelemetry Authors @@ -8,6 +11,7 @@ require 'opentelemetry/sdk' require 'net/http' require 'zlib' +require 'json' require 'google/rpc/status_pb' @@ -53,6 +57,8 @@ def initialize(endpoint: nil, headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}), compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'), timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10), + # content_type may be 'application/x-protobuf' (default) or 'application/json' + content_type: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_TRACES_CONTENT_TYPE', 'OTEL_EXPORTER_OTLP_CONTENT_TYPE', default: 'application/x-protobuf'), metrics_reporter: nil) @uri = prepare_endpoint(endpoint) @@ -62,6 +68,7 @@ def initialize(endpoint: nil, @path = @uri.path @headers = prepare_headers(headers) + @content_type = content_type @timeout = timeout.to_f @compression = compression @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter @@ -103,26 +110,6 @@ def shutdown(timeout: nil) private - # Builds span flags based on whether the parent span context is remote. - # This follows the OTLP specification for span flags. - def build_span_flags(parent_span_is_remote, base_flags) - # Extract integer value from TraceFlags object if needed - # Derive the low 8-bit W3C trace flags using the public API. - base_flags_int = - if base_flags.sampled? - 1 - else - 0 - end - - has_remote_mask = Opentelemetry::Proto::Trace::V1::SpanFlags::SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK - is_remote_mask = Opentelemetry::Proto::Trace::V1::SpanFlags::SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK - - flags = base_flags_int | has_remote_mask - flags |= is_remote_mask if parent_span_is_remote - flags - end - def http_connection(uri, ssl_verify_mode, certificate_file, client_certificate_file, client_key_file) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = uri.scheme == 'https' @@ -159,7 +146,7 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, body = bytes end request.body = body - request.add_field('Content-Type', 'application/x-protobuf') + request.add_field('Content-Type', @content_type) @headers.each { |key, value| request.add_field(key, value) } retry_count = 0 @@ -192,7 +179,7 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, log_request_failure(response.code) FAILURE when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError - log_status(response.body) + log_status(response.body, response['Content-Type']) @metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code }) FAILURE when Net::HTTPRedirection @@ -239,15 +226,53 @@ def handle_redirect(location) # TODO: figure out destination and reinitialize @http and @path end - def log_status(body) - status = Google::Rpc::Status.decode(body) - details = status.details.map do |detail| - klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass - detail.unpack(klass_or_nil) if klass_or_nil - end.compact - OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}} for uri=#{@uri}") - rescue StandardError => e - OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status') + # Attempts to decode rpc.Status from the response body. The exporter + # previously logged a full exception/backtrace when decoding failed for + # non-protobuf bodies returned by some collectors or proxies. To reduce + # noisy error logs in production we: + # - try protobuf decoding first + # - fall back to JSON parsing if protobuf parsing fails + # - if both fail, record a concise message (no exception backtrace) + def log_status(body, content_type = nil) + return if body.nil? || body.empty? + + # Prefer protobuf decoding when content type suggests protobuf or + # when the body looks binary. Attempt decode and only treat parsing + # failures as non-fatal expected conditions. + begin + status = Google::Rpc::Status.decode(body) + details = status.details.map do |detail| + klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass + detail.unpack(klass_or_nil) if klass_or_nil + end.compact + OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}} for uri=#{@uri}") + return + rescue ::Google::Protobuf::ParseError + # fall through to try JSON or plain-text handling + rescue StandardError => e + # For any unexpected errors in decode, log a concise message but + # avoid surfacing the full backtrace for common parse failures. + OpenTelemetry.handle_error(message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status - #{e.class}: #{e.message}") + return + end + + # If protobuf decode failed, try to interpret the body as JSON. + begin + parsed = JSON.parse(body) + if parsed.is_a?(Hash) && parsed['message'] + OpenTelemetry.handle_error(message: "OTLP exporter received status message=#{parsed['message']} details=#{parsed['details'] || []} for uri=#{@uri}") + return + end + rescue JSON::ParserError + # fall through to final fallback + rescue StandardError => e + OpenTelemetry.handle_error(message: "unexpected error parsing rpc.Status body as JSON - #{e.class}: #{e.message}") + return + end + + # Last resort: log a concise message showing the (possibly non-protobuf) + # body. Keep it short to avoid noisy backtraces in production. + OpenTelemetry.handle_error(message: "OTLP exporter received an unrecognized error body for uri=#{@uri}: #{body.inspect}") end def log_request_failure(response_code) @@ -296,30 +321,41 @@ def backoff?(retry_count:, reason:, retry_after: nil) # rubocop:disable Metrics/ def encode(span_data) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity start = Process.clock_gettime(Process::CLOCK_MONOTONIC) - Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode( - Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.new( - resource_spans: span_data - .group_by(&:resource) - .map do |resource, span_datas| - Opentelemetry::Proto::Trace::V1::ResourceSpans.new( - resource: Opentelemetry::Proto::Resource::V1::Resource.new( - attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) } - ), - scope_spans: span_datas - .group_by(&:instrumentation_scope) - .map do |il, sds| - Opentelemetry::Proto::Trace::V1::ScopeSpans.new( - scope: Opentelemetry::Proto::Common::V1::InstrumentationScope.new( - name: il.name, - version: il.version - ), - spans: sds.map { |sd| as_otlp_span(sd) } - ) - end - ) - end - ) + request_message = Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.new( + resource_spans: span_data + .group_by(&:resource) + .map do |resource, span_datas| + Opentelemetry::Proto::Trace::V1::ResourceSpans.new( + resource: Opentelemetry::Proto::Resource::V1::Resource.new( + attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) } + ), + scope_spans: span_datas + .group_by(&:instrumentation_scope) + .map do |il, sds| + Opentelemetry::Proto::Trace::V1::ScopeSpans.new( + scope: Opentelemetry::Proto::Common::V1::InstrumentationScope.new( + name: il.name, + version: il.version + ), + spans: sds.map { |sd| as_otlp_span(sd) } + ) + end + ) + end ) + + if @content_type == 'application/json' + # Try to use protobuf message JSON serialization; fall back to protobuf bytes on error. + begin + # google-protobuf messages generally respond to #to_json + return request_message.to_json + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'failed to serialize OTLP request to JSON; falling back to protobuf') + return Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode(request_message) + end + end + + Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode(request_message) rescue StandardError => e OpenTelemetry.handle_error(exception: e, message: 'unexpected error in OTLP::Exporter#encode') nil @@ -356,9 +392,8 @@ def as_otlp_span(span_data) # rubocop:disable Metrics/MethodLength, Metrics/Cycl trace_id: link.span_context.trace_id, span_id: link.span_context.span_id, trace_state: link.span_context.tracestate.to_s, - attributes: link.attributes&.map { |k, v| as_otlp_key_value(k, v) }, + attributes: link.attributes&.map { |k, v| as_otlp_key_value(k, v) } # TODO: track dropped_attributes_count in Span#trim_links - flags: build_span_flags(link.span_context.remote?, link.span_context.trace_flags) ) end, dropped_links_count: span_data.total_recorded_links - span_data.links&.size.to_i, @@ -367,8 +402,7 @@ def as_otlp_span(span_data) # rubocop:disable Metrics/MethodLength, Metrics/Cycl code: as_otlp_status_code(status.code), message: status.description ) - end, - flags: build_span_flags(span_data.parent_span_is_remote, span_data.trace_flags) + end ) end