Skip to content

Commit 298c697

Browse files
authored
Add support for WithStreamingInputSpans (#20)
* Add support for WithStreamingInputSpans * Fix deprecated fields * Fix typo
1 parent 75604ab commit 298c697

14 files changed

+170
-35
lines changed

Directory.Build.props

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
<Import Project="version.props" />
33
<PropertyGroup>
44
<Authors>Benjamin Krämer</Authors>
5-
<PackageIconUrl>https://avatars0.githubusercontent.com/u/15482765</PackageIconUrl>
5+
<PackageIcon>opentracing-icon.png</PackageIcon>
66
<PackageProjectUrl>https://github.com/opentracing-contrib/csharp-grpc</PackageProjectUrl>
7-
<PackageLicenseUrl>https://raw.githubusercontent.com/opentracing-contrib/csharp-grpc/master/LICENSE</PackageLicenseUrl>
7+
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
88
<PackageReleaseNotes Condition="'$(Version)' != ''">https://github.com/opentracing-contrib/csharp-grpc/releases/tag/v$(Version)</PackageReleaseNotes>
99
<RepositoryType>git</RepositoryType>
1010
<RepositoryUrl>git://github.com/opentracing-contrib/csharp-grpc</RepositoryUrl>
@@ -16,4 +16,8 @@
1616
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
1717
</PropertyGroup>
1818

19+
<ItemGroup>
20+
<None Include="$(SolutionDir)/images/opentracing-icon.png" Pack="true" PackagePath="" />
21+
</ItemGroup>
22+
1923
</Project>

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ using OpenTracing.Contrib.Grpc;
7676
A `ServerTracingInterceptor` uses default settings, which you can override by creating it using a `ServerTracingInterceptor.Builder`.
7777

7878
- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted server. Default is the name of the RPC method. More details in the `Operation Name` section.
79-
- `WithStreaming()`: Logs to the server span whenever a message is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
79+
- `WithStreaming()`: Logs to the server span whenever a message is is received or a response sent. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
80+
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
8081
- `WithVerbosity()`: Logs to the server span additional events, such as message received, headers received and call complete. Default only logs if a call is cancelled.
8182
- `WithTracedAttributes(params ServerRequestAttribute[] attrs)`: Sets tags on the server span in case you want to track information about the RPC call.
8283

@@ -86,6 +87,7 @@ A `ServerTracingInterceptor` uses default settings, which you can override by cr
8687
ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor
8788
.Builder(tracer)
8889
.WithStreaming()
90+
.WithStreamingInputSpans()
8991
.WithVerbosity()
9092
.WithOperationName(new PrefixOperationNameConstructor("Server"))
9193
.WithTracedAttributes(ServerTracingConfiguration.RequestAttribute.Headers,
@@ -99,6 +101,7 @@ A `ClientTracingInterceptor` also has default settings, which you can override b
99101

100102
- `WithOperationName(IOperationNameConstructor operationName)`: Define how the operation name is constructed for all spans created for this intercepted client. Default is the name of the RPC method. More details in the `Operation Name` section.
101103
- `WithStreaming()`: Logs to the client span whenever a message is sent or a response is received. *Note:* This package supports streaming but has not been rigorously tested. If you come across any issues, please let us know.
104+
- `WithStreamingInputSpans()`: Creates a child span for each incoming message. This is adviced when using long-running streams as the calls' span is only finished when the connection is closed.
102105
- `WithVerbosity()`: Logs to the client span additional events, such as call started, message sent, headers received, response received, and call complete. Default only logs if a call is cancelled.
103106
- `WithTracedAttributes(params ClientRequestAttribute[] attrs)`: Sets tags on the client span in case you want to track information about the RPC call.
104107
- `WithWaitForReady()`: Enables WaitForReady on all RPC calls.
@@ -117,6 +120,7 @@ public class CustomOperationNameConstructor : IOperationNameConstructor
117120
ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor
118121
.Builder(tracer)
119122
.WithStreaming()
123+
.WithStreamingInputSpans()
120124
.WithVerbosity()
121125
.WithOperationName(new CustomOperationNameConstructor())
122126
.WithTracingAttributes(ClientTracingConfiguration.RequestAttribute.AllCallOptions,

images/opentracing-icon.png

13.3 KB
Loading

src/OpenTracing.Contrib.Grpc/Configuration/ClientTracingConfiguration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ internal ClientTracingConfiguration(ITracer tracer) : base(tracer)
2626
TracedAttributes = new HashSet<RequestAttribute>();
2727
}
2828

29-
internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
30-
: base(tracer, operationNameConstructor, streaming, verbose)
29+
internal ClientTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes, bool waitForReady, CancellationToken fallbackCancellationToken)
30+
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
3131
{
3232
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
3333
WaitForReady = waitForReady;

src/OpenTracing.Contrib.Grpc/Configuration/ServerTracingConfiguration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ internal ServerTracingConfiguration(ITracer tracer) : base(tracer)
2020
TracedAttributes = new HashSet<RequestAttribute>();
2121
}
2222

23-
internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool verbose, ISet<RequestAttribute> tracedAttributes)
24-
: base(tracer, operationNameConstructor, streaming, verbose)
23+
internal ServerTracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor, bool streaming, bool streamingInputSpans, bool verbose, ISet<RequestAttribute> tracedAttributes)
24+
: base(tracer, operationNameConstructor, streaming, streamingInputSpans, verbose)
2525
{
2626
TracedAttributes = tracedAttributes ?? new HashSet<RequestAttribute>();
2727
}

src/OpenTracing.Contrib.Grpc/Configuration/TracingConfiguration.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ public abstract class TracingConfiguration
77
public ITracer Tracer { get; }
88
public IOperationNameConstructor OperationNameConstructor { get; }
99
public bool Streaming { get; }
10+
public bool StreamingInputSpans { get; }
1011
public bool Verbose { get; }
1112

12-
protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool verbose = false)
13+
protected TracingConfiguration(ITracer tracer, IOperationNameConstructor operationNameConstructor = null, bool streaming = false, bool streamingInputSpans = false, bool verbose = false)
1314
{
1415
Tracer = tracer;
1516
OperationNameConstructor = operationNameConstructor ?? new DefaultOperationNameConstructor();
1617
Streaming = streaming;
18+
StreamingInputSpans = streamingInputSpans;
1719
Verbose = verbose;
1820
}
1921
}

src/OpenTracing.Contrib.Grpc/GrpcTraceLogger.cs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ internal class GrpcTraceLogger<TRequest, TResponse>
1212
private readonly ISpan _span;
1313
private readonly TracingConfiguration _configuration;
1414

15+
private IScope _scope;
16+
private bool _isFinished;
17+
18+
private ISpan ScopeSpan => _scope?.Span ?? _span;
19+
1520
public GrpcTraceLogger(ISpan span, TracingConfiguration configuration)
1621
{
1722
_span = span;
@@ -34,11 +39,27 @@ public void ResponseHeader(Metadata metadata)
3439
});
3540
}
3641

42+
public void BeginScope(string operationName)
43+
{
44+
if (!(_configuration.StreamingInputSpans || _configuration.Verbose)) return;
45+
46+
_scope = _configuration.Tracer.BuildSpan(operationName).StartActive(false);
47+
}
48+
49+
public void EndScope()
50+
{
51+
if (_scope == null || !(_configuration.StreamingInputSpans || _configuration.Verbose)) return;
52+
53+
_scope.Span.Finish();
54+
_scope.Dispose();
55+
_scope = null;
56+
}
57+
3758
public void Request(TRequest req)
3859
{
3960
if (!(_configuration.Streaming || _configuration.Verbose)) return;
4061

41-
_span.Log(new Dictionary<string, object>
62+
ScopeSpan.Log(new Dictionary<string, object>
4263
{
4364
{ LogFields.Event, "gRPC request" },
4465
{ "data", req }
@@ -49,7 +70,7 @@ public void Response(TResponse rsp)
4970
{
5071
if (!(_configuration.Streaming || _configuration.Verbose)) return;
5172

52-
_span.Log(new Dictionary<string, object>
73+
ScopeSpan.Log(new Dictionary<string, object>
5374
{
5475
{ LogFields.Event, "gRPC response" },
5576
{ "data", rsp }
@@ -62,7 +83,7 @@ public void FinishSuccess()
6283
{
6384
_span.Log("Call completed");
6485
}
65-
_span.Finish();
86+
Finish();
6687
}
6788

6889
public void FinishException(Exception ex)
@@ -71,8 +92,17 @@ public void FinishException(Exception ex)
7192
{
7293
_span.Log("Call failed");
7394
}
74-
_span.SetException(ex)
75-
.Finish();
95+
_span.SetException(ex);
96+
Finish();
97+
}
98+
99+
private void Finish()
100+
{
101+
if (_isFinished) return;
102+
103+
EndScope();
104+
_span.Finish();
105+
_isFinished = true;
76106
}
77107
}
78108
}

src/OpenTracing.Contrib.Grpc/Handler/InterceptedClientHandler.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ internal class InterceptedClientHandler<TRequest, TResponse>
1818
private readonly ClientTracingConfiguration _configuration;
1919
private readonly ClientInterceptorContext<TRequest, TResponse> _context;
2020
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
21+
private readonly TracingAsyncStreamReader<TResponse>.StreamActions _streamActions;
2122

2223
public InterceptedClientHandler(ClientTracingConfiguration configuration, ClientInterceptorContext<TRequest, TResponse> context)
2324
{
@@ -33,6 +34,9 @@ public InterceptedClientHandler(ClientTracingConfiguration configuration, Client
3334
var span = InitializeSpanWithHeaders();
3435
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);
3536
_configuration.Tracer.Inject(span.Context, BuiltinFormats.HttpHeaders, new MetadataCarrier(_context.Options.Headers));
37+
38+
var scopeActions = new ScopeActions("new_response", _logger.BeginScope, _logger.EndScope);
39+
_streamActions = new TracingAsyncStreamReader<TResponse>.StreamActions(scopeActions, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
3640
}
3741

3842
private CallOptions ApplyConfigToCallOptions(CallOptions callOptions)
@@ -143,7 +147,7 @@ public AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall(TRequest req
143147
{
144148
_logger.Request(request);
145149
var rspCnt = continuation(request, _context);
146-
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
150+
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
147151
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
148152
return new AsyncServerStreamingCall<TResponse>(tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
149153
}
@@ -175,7 +179,7 @@ public AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall(In
175179
{
176180
var rspCnt = continuation(_context);
177181
var tracingRequestStream = new TracingClientStreamWriter<TRequest>(rspCnt.RequestStream, _logger.Request);
178-
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _logger.Response, _logger.FinishSuccess, _logger.FinishException);
182+
var tracingResponseStream = new TracingAsyncStreamReader<TResponse>(rspCnt.ResponseStream, _streamActions);
179183
var rspHeaderAsync = rspCnt.ResponseHeadersAsync.ContinueWith(LogResponseHeader);
180184
return new AsyncDuplexStreamingCall<TRequest, TResponse>(tracingRequestStream, tracingResponseStream, rspHeaderAsync, rspCnt.GetStatus, rspCnt.GetTrailers, rspCnt.Dispose);
181185
}

src/OpenTracing.Contrib.Grpc/Handler/InterceptedServerHandler.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class InterceptedServerHandler<TRequest, TResponse>
1616
private readonly ServerTracingConfiguration _configuration;
1717
private readonly ServerCallContext _context;
1818
private readonly GrpcTraceLogger<TRequest, TResponse> _logger;
19+
private readonly TracingAsyncStreamReader<TRequest>.StreamActions _streamActions;
1920

2021
public InterceptedServerHandler(ServerTracingConfiguration configuration, ServerCallContext context)
2122
{
@@ -24,6 +25,9 @@ public InterceptedServerHandler(ServerTracingConfiguration configuration, Server
2425

2526
var span = GetSpanFromContext();
2627
_logger = new GrpcTraceLogger<TRequest, TResponse>(span, configuration);
28+
29+
var scopeActions = new ScopeActions("new_request", _logger.BeginScope, _logger.EndScope);
30+
_streamActions = new TracingAsyncStreamReader<TRequest>.StreamActions(scopeActions, _logger.Request);
2731
}
2832

2933
private ISpan GetSpanFromContext()
@@ -83,7 +87,7 @@ public async Task<TResponse> ClientStreamingServerHandler(IAsyncStreamReader<TRe
8387
{
8488
try
8589
{
86-
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _logger.Request);
90+
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
8791
var response = await continuation(tracingRequestStream, _context).ConfigureAwait(false);
8892
_logger.Response(response);
8993
_logger.FinishSuccess();
@@ -116,7 +120,7 @@ public async Task DuplexStreamingServerHandler(IAsyncStreamReader<TRequest> requ
116120
{
117121
try
118122
{
119-
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _logger.Request);
123+
var tracingRequestStream = new TracingAsyncStreamReader<TRequest>(requestStream, _streamActions);
120124
var tracingResponseStream = new TracingServerStreamWriter<TResponse>(responseStream, _logger.Response);
121125
await continuation(tracingRequestStream, tracingResponseStream, _context).ConfigureAwait(false);
122126
_logger.FinishSuccess();

src/OpenTracing.Contrib.Grpc/Interceptors/ClientTracingInterceptor.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class Builder
6060
private readonly ITracer _tracer;
6161
private IOperationNameConstructor _operationNameConstructor;
6262
private bool _streaming;
63+
private bool _streamingInputSpans;
6364
private bool _verbose;
6465
private ISet<ClientTracingConfiguration.RequestAttribute> _tracedAttributes;
6566
private bool _waitForReady;
@@ -88,6 +89,16 @@ public Builder WithStreaming()
8889
return this;
8990
}
9091

92+
/// <summary>
93+
/// Creates a child span for each input message received.
94+
/// </summary>
95+
/// <returns>this Builder configured to create child spans</returns>
96+
public Builder WithStreamingInputSpans()
97+
{
98+
_streamingInputSpans = true;
99+
return this;
100+
}
101+
91102
/// <summary>
92103
/// Logs all request life-cycle events to client spans.
93104
/// </summary>
@@ -126,7 +137,7 @@ public Builder WithFallbackCancellationToken(CancellationToken cancellationToken
126137

127138
public ClientTracingInterceptor Build()
128139
{
129-
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
140+
var configuration = new ClientTracingConfiguration(_tracer, _operationNameConstructor, _streaming, _streamingInputSpans, _verbose, _tracedAttributes, _waitForReady, _cancellationToken);
130141
return new ClientTracingInterceptor(configuration);
131142
}
132143
}

0 commit comments

Comments
 (0)