|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | +# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. |
| 6 | + |
| 7 | +require 'socket' |
| 8 | +require 'base64' |
| 9 | +require 'opentelemetry' |
| 10 | +require 'opentelemetry/exporter/otlp' |
| 11 | +require 'opentelemetry/sdk' |
| 12 | + |
| 13 | +DEFAULT_ENDPOINT = 'localhost:2000' |
| 14 | +PROTOCOL_HEADER = "{\"format\":\"json\",\"version\":1}\n" |
| 15 | +DEFAULT_FORMAT_OTEL_TRACES_BINARY_PREFIX = 'T1S' |
| 16 | + |
| 17 | +module AWS |
| 18 | + module Distro |
| 19 | + module OpenTelemetry |
| 20 | + module Exporter |
| 21 | + module XRay |
| 22 | + module UDP |
| 23 | + # Class that sends data over UDP. |
| 24 | + class UdpExporter |
| 25 | + def initialize(endpoint = nil) |
| 26 | + @endpoint = endpoint || DEFAULT_ENDPOINT |
| 27 | + @host, @port = parse_endpoint(@endpoint) |
| 28 | + @socket = UDPSocket.new |
| 29 | + end |
| 30 | + |
| 31 | + def send_data(data, signal_prefix) |
| 32 | + base64_encoded_string = Base64.strict_encode64(data) |
| 33 | + message = "#{PROTOCOL_HEADER}#{signal_prefix}#{base64_encoded_string}" |
| 34 | + |
| 35 | + begin |
| 36 | + @socket.send(message.encode('utf-8'), 0, @host, @port) |
| 37 | + rescue StandardError => e |
| 38 | + ::OpenTelemetry.logger.error("Error sending UDP data: #{e}") |
| 39 | + raise e |
| 40 | + end |
| 41 | + end |
| 42 | + |
| 43 | + def shutdown |
| 44 | + @socket.close |
| 45 | + end |
| 46 | + |
| 47 | + private |
| 48 | + |
| 49 | + def parse_endpoint(endpoint) |
| 50 | + host, port = endpoint.split(':') |
| 51 | + [host, port.to_i] |
| 52 | + rescue StandardError => _e |
| 53 | + raise "Invalid endpoint: #{endpoint}" |
| 54 | + end |
| 55 | + end |
| 56 | + |
| 57 | + # An trace exporter that sends spans over UDP in an X-Ray format. |
| 58 | + class AWSXRayUDPSpanExporter < ::OpenTelemetry::SDK::Trace::Export::SpanExporter # rubocop:disable Metrics/ClassLength |
| 59 | + SUCCESS = ::OpenTelemetry::SDK::Trace::Export::SUCCESS |
| 60 | + FAILURE = ::OpenTelemetry::SDK::Trace::Export::FAILURE |
| 61 | + private_constant(:SUCCESS, :FAILURE) |
| 62 | + |
| 63 | + def initialize(endpoint = nil, signal_prefix = DEFAULT_FORMAT_OTEL_TRACES_BINARY_PREFIX) |
| 64 | + @endpoint = if endpoint.nil? |
| 65 | + if lambda_environment? |
| 66 | + xray_daemon_endpoint || DEFAULT_ENDPOINT |
| 67 | + else |
| 68 | + DEFAULT_ENDPOINT |
| 69 | + end |
| 70 | + else |
| 71 | + endpoint |
| 72 | + end |
| 73 | + |
| 74 | + @udp_exporter = AWS::Distro::OpenTelemetry::Exporter::XRay::UDP::UdpExporter.new(@endpoint) |
| 75 | + @signal_prefix = signal_prefix |
| 76 | + @shutdown = false |
| 77 | + end |
| 78 | + |
| 79 | + # Called to export sampled {::OpenTelemetry::SDK::Trace::SpanData} structs. |
| 80 | + # |
| 81 | + # @param [Enumerable<::OpenTelemetry::SDK::Trace::SpanData>] span_data the |
| 82 | + # list of recorded {::OpenTelemetry::SDK::Trace::SpanData} structs to be |
| 83 | + # exported. |
| 84 | + # @param [optional Numeric] timeout An optional timeout in seconds. |
| 85 | + # @return [Integer] the result of the export. |
| 86 | + def export(span_data, timeout: nil) |
| 87 | + return FAILURE if @shutdown |
| 88 | + |
| 89 | + encoded_etsr = encode(span_data) |
| 90 | + return FAILURE if encoded_etsr.nil? |
| 91 | + |
| 92 | + begin |
| 93 | + @udp_exporter.send_data(encoded_etsr, @signal_prefix) |
| 94 | + SUCCESS |
| 95 | + rescue StandardError => e |
| 96 | + ::OpenTelemetry.logger.error("Error exporting spans: #{e}") |
| 97 | + FAILURE |
| 98 | + end |
| 99 | + end |
| 100 | + |
| 101 | + # Called when {::OpenTelemetry::SDK::Trace::TracerProvider#force_flush} is called, if |
| 102 | + # this exporter is registered to a {::OpenTelemetry::SDK::Trace::TracerProvider} |
| 103 | + # object. |
| 104 | + # |
| 105 | + # @param [optional Numeric] timeout An optional timeout in seconds. |
| 106 | + def force_flush(timeout: nil) |
| 107 | + SUCCESS |
| 108 | + end |
| 109 | + |
| 110 | + # Called when {::OpenTelemetry::SDK::Trace::TracerProvider#shutdown} is called, if |
| 111 | + # this exporter is registered to a {::OpenTelemetry::SDK::Trace::TracerProvider} |
| 112 | + # object. |
| 113 | + # |
| 114 | + # @param [optional Numeric] timeout An optional timeout in seconds. |
| 115 | + def shutdown(timeout: nil) |
| 116 | + @udp_exporter.shutdown |
| 117 | + @shutdown = true |
| 118 | + SUCCESS |
| 119 | + end |
| 120 | + |
| 121 | + private |
| 122 | + |
| 123 | + def lambda_environment? |
| 124 | + !ENV['AWS_LAMBDA_FUNCTION_NAME'].nil? |
| 125 | + end |
| 126 | + |
| 127 | + def xray_daemon_endpoint |
| 128 | + ENV['AWS_XRAY_DAEMON_ADDRESS'] |
| 129 | + end |
| 130 | + |
| 131 | + # The OpenTelemetry Authors code |
| 132 | + # https://github.com/open-telemetry/opentelemetry-ruby/blob/opentelemetry-exporter-otlp/v0.30.0/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb#L277-L396 |
| 133 | + def encode(span_data) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity |
| 134 | + ::Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.encode( |
| 135 | + ::Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.new( |
| 136 | + resource_spans: span_data |
| 137 | + .group_by(&:resource) |
| 138 | + .map do |resource, span_datas| |
| 139 | + ::Opentelemetry::Proto::Trace::V1::ResourceSpans.new( |
| 140 | + resource: ::Opentelemetry::Proto::Resource::V1::Resource.new( |
| 141 | + attributes: resource.attribute_enumerator.map { |key, value| as_otlp_key_value(key, value) } |
| 142 | + ), |
| 143 | + scope_spans: span_datas |
| 144 | + .group_by(&:instrumentation_scope) |
| 145 | + .map do |il, sds| |
| 146 | + ::Opentelemetry::Proto::Trace::V1::ScopeSpans.new( |
| 147 | + scope: ::Opentelemetry::Proto::Common::V1::InstrumentationScope.new( |
| 148 | + name: il.name, |
| 149 | + version: il.version |
| 150 | + ), |
| 151 | + spans: sds.map { |sd| as_otlp_span(sd) } |
| 152 | + ) |
| 153 | + end |
| 154 | + ) |
| 155 | + end |
| 156 | + ) |
| 157 | + ) |
| 158 | + rescue StandardError => e |
| 159 | + OpenTelemetry.handle_error(exception: e, message: 'unexpected error in OTLP::Exporter#encode') |
| 160 | + nil |
| 161 | + end |
| 162 | + |
| 163 | + def as_otlp_span(span_data) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity |
| 164 | + ::Opentelemetry::Proto::Trace::V1::Span.new( |
| 165 | + trace_id: span_data.trace_id, |
| 166 | + span_id: span_data.span_id, |
| 167 | + trace_state: span_data.tracestate.to_s, |
| 168 | + parent_span_id: span_data.parent_span_id == ::OpenTelemetry::Trace::INVALID_SPAN_ID ? nil : span_data.parent_span_id, |
| 169 | + name: span_data.name, |
| 170 | + kind: as_otlp_span_kind(span_data.kind), |
| 171 | + start_time_unix_nano: span_data.start_timestamp, |
| 172 | + end_time_unix_nano: span_data.end_timestamp, |
| 173 | + attributes: span_data.attributes&.map { |k, v| as_otlp_key_value(k, v) }, |
| 174 | + dropped_attributes_count: span_data.total_recorded_attributes - span_data.attributes&.size.to_i, |
| 175 | + events: span_data.events&.map do |event| |
| 176 | + ::Opentelemetry::Proto::Trace::V1::Span::Event.new( |
| 177 | + time_unix_nano: event.timestamp, |
| 178 | + name: event.name, |
| 179 | + attributes: event.attributes&.map { |k, v| as_otlp_key_value(k, v) } |
| 180 | + # TODO: track dropped_attributes_count in Span#append_event |
| 181 | + ) |
| 182 | + end, |
| 183 | + dropped_events_count: span_data.total_recorded_events - span_data.events&.size.to_i, |
| 184 | + links: span_data.links&.map do |link| |
| 185 | + ::Opentelemetry::Proto::Trace::V1::Span::Link.new( |
| 186 | + trace_id: link.span_context.trace_id, |
| 187 | + span_id: link.span_context.span_id, |
| 188 | + trace_state: link.span_context.tracestate.to_s, |
| 189 | + attributes: link.attributes&.map { |k, v| as_otlp_key_value(k, v) } |
| 190 | + # TODO: track dropped_attributes_count in Span#trim_links |
| 191 | + ) |
| 192 | + end, |
| 193 | + dropped_links_count: span_data.total_recorded_links - span_data.links&.size.to_i, |
| 194 | + status: span_data.status&.yield_self do |status| |
| 195 | + ::Opentelemetry::Proto::Trace::V1::Status.new( |
| 196 | + code: as_otlp_status_code(status.code), |
| 197 | + message: status.description |
| 198 | + ) |
| 199 | + end |
| 200 | + ) |
| 201 | + end |
| 202 | + |
| 203 | + def as_otlp_status_code(code) |
| 204 | + case code |
| 205 | + when ::OpenTelemetry::Trace::Status::OK then ::Opentelemetry::Proto::Trace::V1::Status::StatusCode::STATUS_CODE_OK |
| 206 | + when ::OpenTelemetry::Trace::Status::ERROR then ::Opentelemetry::Proto::Trace::V1::Status::StatusCode::STATUS_CODE_ERROR |
| 207 | + else ::Opentelemetry::Proto::Trace::V1::Status::StatusCode::STATUS_CODE_UNSET |
| 208 | + end |
| 209 | + end |
| 210 | + |
| 211 | + def as_otlp_span_kind(kind) |
| 212 | + case kind |
| 213 | + when :internal then ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_INTERNAL |
| 214 | + when :server then ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_SERVER |
| 215 | + when :client then ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_CLIENT |
| 216 | + when :producer then ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_PRODUCER |
| 217 | + when :consumer then ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_CONSUMER |
| 218 | + else ::Opentelemetry::Proto::Trace::V1::Span::SpanKind::SPAN_KIND_UNSPECIFIED |
| 219 | + end |
| 220 | + end |
| 221 | + |
| 222 | + def as_otlp_key_value(key, value) |
| 223 | + ::Opentelemetry::Proto::Common::V1::KeyValue.new(key: key, value: as_otlp_any_value(value)) |
| 224 | + rescue Encoding::UndefinedConversionError => e |
| 225 | + encoded_value = value.encode('UTF-8', invalid: :replace, undef: :replace, replace: '�') |
| 226 | + OpenTelemetry.handle_error(exception: e, message: "encoding error for key #{key} and value #{encoded_value}") |
| 227 | + ::Opentelemetry::Proto::Common::V1::KeyValue.new(key: key, value: as_otlp_any_value('Encoding Error')) |
| 228 | + end |
| 229 | + |
| 230 | + def as_otlp_any_value(value) |
| 231 | + result = ::Opentelemetry::Proto::Common::V1::AnyValue.new |
| 232 | + case value |
| 233 | + when String |
| 234 | + result.string_value = value |
| 235 | + when Integer |
| 236 | + result.int_value = value |
| 237 | + when Float |
| 238 | + result.double_value = value |
| 239 | + when true, false |
| 240 | + result.bool_value = value |
| 241 | + when Array |
| 242 | + values = value.map { |element| as_otlp_any_value(element) } |
| 243 | + result.array_value = ::Opentelemetry::Proto::Common::V1::ArrayValue.new(values: values) |
| 244 | + end |
| 245 | + result |
| 246 | + end |
| 247 | + # END The OpenTelemetry Authors code |
| 248 | + end |
| 249 | + end |
| 250 | + end |
| 251 | + end |
| 252 | + end |
| 253 | + end |
| 254 | +end |
0 commit comments