Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package com.uber.cadence.internal.compatibility.proto.serviceclient;

import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.uber.cadence.api.v1.*;
import com.uber.cadence.api.v1.DomainAPIGrpc;
import com.uber.cadence.api.v1.MetaAPIGrpc;
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
Expand All @@ -32,7 +30,6 @@
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
import com.uber.cadence.internal.Version;
import com.uber.cadence.internal.tracing.TracingPropagator;
import com.uber.cadence.serviceclient.ClientOptions;
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
import io.grpc.*;
Expand All @@ -41,13 +38,9 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,6 +109,7 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
}
mergeHeaders(headers, options.getHeaders());

Channel interceptedChannel =
ClientInterceptors.intercept(
Expand Down Expand Up @@ -205,117 +199,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
}

private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
return new ClientInterceptor() {
private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
private final String OPERATIONFORMAT = "cadence-%s";

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Span span =
tracingPropagator.spanByServiceMethod(
String.format(OPERATIONFORMAT, method.getBareMethodName()));
Scope scope = tracer.activateSpan(span);
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
try {
super.onClose(status, trailers);
} finally {
span.finish();
scope.close();
}
}
},
headers);
}

@SuppressWarnings("unchecked")
@Override
public void sendMessage(ReqT message) {
if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
&& message instanceof StartWorkflowExecutionRequest) {
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
Header newHeader = addTracingHeaders(request.getHeader());

// cast should not throw error as we are using the builder
message = (ReqT) request.toBuilder().setHeader(newHeader).build();
} else if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecutionAsync")
&& message instanceof StartWorkflowExecutionAsyncRequest) {
StartWorkflowExecutionAsyncRequest request =
(StartWorkflowExecutionAsyncRequest) message;
Header newHeader = addTracingHeaders(request.getRequest().getHeader());

// cast should not throw error as we are using the builder
message =
(ReqT)
request
.toBuilder()
.setRequest(request.getRequest().toBuilder().setHeader(newHeader))
.build();
} else if (Objects.equals(
method.getBareMethodName(), "SignalWithStartWorkflowExecution")
&& message instanceof SignalWithStartWorkflowExecutionRequest) {
SignalWithStartWorkflowExecutionRequest request =
(SignalWithStartWorkflowExecutionRequest) message;
Header newHeader = addTracingHeaders(request.getStartRequest().getHeader());

// cast should not throw error as we are using the builder
message =
(ReqT)
request
.toBuilder()
.setStartRequest(
request.getStartRequest().toBuilder().setHeader(newHeader))
.build();
} else if (Objects.equals(
method.getBareMethodName(), "SignalWithStartWorkflowExecutionAsync")
&& message instanceof SignalWithStartWorkflowExecutionAsyncRequest) {
SignalWithStartWorkflowExecutionAsyncRequest request =
(SignalWithStartWorkflowExecutionAsyncRequest) message;
Header newHeader =
addTracingHeaders(request.getRequest().getStartRequest().getHeader());

// cast should not throw error as we are using the builder
message =
(ReqT)
request
.toBuilder()
.setRequest(
request
.getRequest()
.toBuilder()
.setStartRequest(
request
.getRequest()
.getStartRequest()
.toBuilder()
.setHeader(newHeader)))
.build();
}
super.sendMessage(message);
}

private Header addTracingHeaders(Header header) {
Map<String, byte[]> headers = new HashMap<>();
tracingPropagator.inject(headers);
Header.Builder headerBuilder = header.toBuilder();
headers.forEach(
(k, v) ->
headerBuilder.putFields(
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build()));
return headerBuilder.build();
}
};
}
};
return new OpenTracingInterceptor(tracer);
}

private ClientInterceptor newTracingInterceptor() {
Expand Down Expand Up @@ -488,4 +372,22 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
}
}

private static void mergeHeaders(Metadata metadata, Map<String, String> headers) {
if (headers == null) {
return;
}
for (Map.Entry<String, String> entry : headers.entrySet()) {
Metadata.Key<String> key = Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER);
// Allow headers to overwrite any defaults
if (metadata.containsKey(key)) {
metadata.removeAll(key);
}
// Only replace it if they specify a value.
// This allows for removing headers
if (!Strings.isNullOrEmpty(entry.getValue())) {
metadata.put(key, entry.getValue());
}
}
}
}
Loading