From a0314a816344ae1b306a006089af46b1c852241a Mon Sep 17 00:00:00 2001 From: Zack Young Date: Fri, 21 Nov 2025 12:56:44 -0700 Subject: [PATCH 1/3] fix: Sidekiq spans crashing when enqueued by Exq Exq doesn't seem to pass "created_at" with its messages. [Current Sidekiq API](https://github.com/sidekiq/sidekiq/blob/f9e0a026543c93ae49c1d1d41c6592d2877d318a/lib/sidekiq/api.rb#L456) defaults `created_at` to `enqueued_at` or `0` if it's not available, so we can mimic that method for safely defaulting thee values. --- .../middlewares/server/tracer_middleware.rb | 4 +-- .../server/tracer_middleware_test.rb | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb index 47c3d7dce7..cdaf5300a1 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb @@ -31,8 +31,8 @@ def call(_worker, msg, _queue) end extracted_context = OpenTelemetry.propagation.extract(msg) - created_at = time_from_timestamp(msg['created_at']) - enqueued_at = time_from_timestamp(msg['created_at']) + created_at = time_from_timestamp(msg['created_at'] || msg['enqueued_at'] || 0) + enqueued_at = time_from_timestamp(msg['enqueued_at'] || 0) OpenTelemetry::Context.with_current(extracted_context) do if instrumentation_config[:propagation_style] == :child tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span| diff --git a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb index 5c770bb78c..ed4620857d 100644 --- a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb +++ b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb @@ -67,6 +67,31 @@ _(job_span.attributes['messaging.operation']).must_equal 'process' end + it 'traces when enqueued through another system' do + payload = { 'queue' => 'default', 'args' => [], 'class' => SimpleJob, 'enqueued_at' => Time.current, 'jid' => '4' } + Sidekiq::Client.new.send(:raw_push, [payload]) + Sidekiq::Worker.drain_all + + _(job_span.name).must_equal 'default process' + _(job_span.kind).must_equal :consumer + _(job_span.attributes['messaging.system']).must_equal 'sidekiq' + _(job_span.attributes['messaging.sidekiq.job_class']).must_equal 'SimpleJob' + _(job_span.attributes['messaging.message_id']).must_equal '4' + _(job_span.attributes['messaging.destination']).must_equal 'default' + _(job_span.attributes['messaging.destination_kind']).must_equal 'queue' + _(job_span.attributes['messaging.operation']).must_equal 'process' + _(job_span.attributes['peer.service']).must_be_nil + _(job_span.events.size).must_equal(2) + + created_event = job_span.events[0] + _(created_event.name).must_equal('created_at') + _(created_event.timestamp.digits.count).must_equal(19) + + enqueued_event = job_span.events[1] + _(enqueued_event.name).must_equal('enqueued_at') + _(enqueued_event.timestamp.digits.count).must_equal(19) + end + it 'defaults to using links to the enqueing span but does not continue the trace' do SimpleJob.perform_async SimpleJob.drain From 040c291b4a3f3319d7cd08ecad50038df1be96db Mon Sep 17 00:00:00 2001 From: Zack Young Date: Thu, 27 Nov 2025 19:25:02 -0700 Subject: [PATCH 2/3] DRY up timestamp adding and only add events if the timestamp actually exists --- .../middlewares/server/tracer_middleware.rb | 15 +++++++++------ .../middlewares/server/tracer_middleware_test.rb | 8 ++------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb index cdaf5300a1..4153741948 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb @@ -31,13 +31,10 @@ def call(_worker, msg, _queue) end extracted_context = OpenTelemetry.propagation.extract(msg) - created_at = time_from_timestamp(msg['created_at'] || msg['enqueued_at'] || 0) - enqueued_at = time_from_timestamp(msg['enqueued_at'] || 0) OpenTelemetry::Context.with_current(extracted_context) do if instrumentation_config[:propagation_style] == :child tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span| - span.add_event('created_at', timestamp: created_at) - span.add_event('enqueued_at', timestamp: enqueued_at) + add_span_timestamps(span, msg) yield end else @@ -46,8 +43,7 @@ def call(_worker, msg, _queue) links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid? span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer) OpenTelemetry::Trace.with_span(span) do - span.add_event('created_at', timestamp: created_at) - span.add_event('enqueued_at', timestamp: enqueued_at) + add_span_timestamps(span, msg) yield rescue Exception => e # rubocop:disable Lint/RescueException span.record_exception(e) @@ -70,6 +66,13 @@ def tracer Sidekiq::Instrumentation.instance.tracer end + def add_span_timestamps(span, msg) + created_at = msg['created_at'] + enqueued_at = msg['enqueued_at'] + span.add_event('created_at', timestamp: time_from_timestamp(created_at)) if created_at + span.add_event('enqueued_at', timestamp: time_from_timestamp(enqueued_at)) if enqueued_at + end + def time_from_timestamp(timestamp) if timestamp.is_a?(Float) # old format, timestamps were stored as fractional seconds since the epoch diff --git a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb index ed4620857d..835ba234e9 100644 --- a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb +++ b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb @@ -81,13 +81,9 @@ _(job_span.attributes['messaging.destination_kind']).must_equal 'queue' _(job_span.attributes['messaging.operation']).must_equal 'process' _(job_span.attributes['peer.service']).must_be_nil - _(job_span.events.size).must_equal(2) - - created_event = job_span.events[0] - _(created_event.name).must_equal('created_at') - _(created_event.timestamp.digits.count).must_equal(19) + _(job_span.events.size).must_equal(1) - enqueued_event = job_span.events[1] + enqueued_event = job_span.events[0] _(enqueued_event.name).must_equal('enqueued_at') _(enqueued_event.timestamp.digits.count).must_equal(19) end From ed95854016a595665f48fc37b77b4e51ff8ba857 Mon Sep 17 00:00:00 2001 From: Zack Young Date: Thu, 27 Nov 2025 19:35:48 -0700 Subject: [PATCH 3/3] Rename test according to suggestion --- .../sidekiq/middlewares/server/tracer_middleware_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb index 835ba234e9..1d5272c6a5 100644 --- a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb +++ b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb @@ -67,7 +67,7 @@ _(job_span.attributes['messaging.operation']).must_equal 'process' end - it 'traces when enqueued through another system' do + it 'traces when enqueued with minimal data' do payload = { 'queue' => 'default', 'args' => [], 'class' => SimpleJob, 'enqueued_at' => Time.current, 'jid' => '4' } Sidekiq::Client.new.send(:raw_push, [payload]) Sidekiq::Worker.drain_all