1717
1818import io .micrometer .core .instrument .MeterRegistry ;
1919import io .netty .buffer .ByteBuf ;
20- import io .netty .buffer .ByteBufAllocator ;
2120import io .opentracing .Tracer ;
2221import io .rsocket .Payload ;
2322import io .rsocket .RSocket ;
24- import io .rsocket .rpc .frames .Metadata ;
25- import io .rsocket .rpc .metrics .Metrics ;
26- import io .rsocket .rpc .tracing .Tag ;
27- import io .rsocket .rpc .tracing .Tracing ;
23+ import io .rsocket .ipc .encoders .CompositeMetadataEncoder ;
24+ import io .rsocket .ipc .encoders .PlainMetadataEncoder ;
25+ import io .rsocket .ipc .metrics .Metrics ;
26+ import io .rsocket .ipc .tracing .SimpleSpanContext ;
27+ import io .rsocket .ipc .tracing .Tag ;
28+ import io .rsocket .ipc .tracing .Tracing ;
2829import io .rsocket .util .ByteBufPayload ;
30+ import java .nio .charset .Charset ;
2931import java .util .HashMap ;
3032import java .util .Map ;
3133import java .util .Objects ;
3840public final class Client <I , O > {
3941
4042 private final String service ;
43+ private final MetadataEncoder metadataEncoder ;
4144 private final Marshaller <I > marshaller ;
4245 private final Unmarshaller <O > unmarshaller ;
4346 private final RSocket rSocket ;
@@ -46,12 +49,14 @@ public final class Client<I, O> {
4649
4750 private Client (
4851 final String service ,
52+ final MetadataEncoder metadataEncoder ,
4953 final Marshaller marshaller ,
5054 final Unmarshaller unmarshaller ,
5155 final RSocket rSocket ,
5256 final MeterRegistry meterRegistry ,
5357 final Tracer tracer ) {
5458 this .service = service ;
59+ this .metadataEncoder = metadataEncoder ;
5560 this .marshaller = marshaller ;
5661 this .unmarshaller = unmarshaller ;
5762 this .rSocket = rSocket ;
@@ -60,7 +65,15 @@ private Client(
6065 }
6166
6267 public interface R {
63- M rsocket (RSocket rSocket );
68+ E rsocket (RSocket rSocket );
69+ }
70+
71+ public interface E {
72+ M compositeMetadataEncoder ();
73+
74+ M plainMetadataEncoder ();
75+
76+ M customMetadataEncoder (MetadataEncoder encoder );
6477 }
6578
6679 public interface M {
@@ -217,9 +230,10 @@ <X> Functions.FireAndForget<X> genericFireAndForget(String route, Marshaller<X>
217230 doFireAndForget (service , route , rSocket , marshaller , o , byteBuf , metrics , tracing );
218231 }
219232
220- private static class Builder implements P , U , R , M , T {
233+ private static class Builder implements P , U , E , R , M , T {
221234 private final String service ;
222235 private Marshaller marshaller ;
236+ private MetadataEncoder encoder ;
223237 private MeterRegistry meterRegistry ;
224238 private Tracer tracer ;
225239 private RSocket rSocket ;
@@ -228,6 +242,24 @@ private Builder(String service) {
228242 this .service = service ;
229243 }
230244
245+ @ Override
246+ public M compositeMetadataEncoder () {
247+ this .encoder = new CompositeMetadataEncoder ();
248+ return this ;
249+ }
250+
251+ @ Override
252+ public M plainMetadataEncoder () {
253+ this .encoder = new PlainMetadataEncoder ("." , Charset .defaultCharset ());
254+ return this ;
255+ }
256+
257+ @ Override
258+ public M customMetadataEncoder (MetadataEncoder encoder ) {
259+ this .encoder = encoder ;
260+ return this ;
261+ }
262+
231263 @ Override
232264 public <I > U <I > marshall (Marshaller <I > marshaller ) {
233265 this .marshaller = Objects .requireNonNull (marshaller );
@@ -237,11 +269,11 @@ public <I> U<I> marshall(Marshaller<I> marshaller) {
237269 @ Override
238270 public Client unmarshall (Unmarshaller unmarshaller ) {
239271 Objects .requireNonNull (unmarshaller );
240- return new Client (service , marshaller , unmarshaller , rSocket , meterRegistry , tracer );
272+ return new Client (service , encoder , marshaller , unmarshaller , rSocket , meterRegistry , tracer );
241273 }
242274
243275 @ Override
244- public M rsocket (RSocket rSocket ) {
276+ public E rsocket (RSocket rSocket ) {
245277 this .rSocket = Objects .requireNonNull (rSocket );
246278 return this ;
247279 }
@@ -279,17 +311,23 @@ private <X> Mono<Void> doFireAndForget(
279311 Function <? super Publisher <Void >, ? extends Publisher <Void >> metrics ,
280312 Function <Map <String , String >, Function <? super Publisher <Void >, ? extends Publisher <Void >>>
281313 tracing ) {
282- try {
283- HashMap <String , String > map = new HashMap <>();
284- ByteBuf d = marshaller .apply (o );
285- ByteBuf t = Tracing .mapToByteBuf (ByteBufAllocator .DEFAULT , map );
286- ByteBuf m = Metadata .encode (ByteBufAllocator .DEFAULT , service , route , t , metadata );
287-
288- Payload payload = ByteBufPayload .create (d , m );
289- return r .fireAndForget (payload ).transform (metrics ).transform (tracing .apply (map ));
290- } catch (Throwable t ) {
291- return Mono .error (t );
292- }
314+ final HashMap <String , String > map = new HashMap <>();
315+ return Mono .defer (
316+ () -> {
317+ try {
318+ ByteBuf d = marshaller .apply (o );
319+ ByteBuf m =
320+ metadataEncoder .encode (metadata , new SimpleSpanContext (map ), service , route );
321+ metadata .release ();
322+ Payload payload = ByteBufPayload .create (d , m );
323+ return r .fireAndForget (payload );
324+ } catch (Throwable t ) {
325+ metadata .release ();
326+ return Mono .error (t );
327+ }
328+ })
329+ .transform (metrics )
330+ .transform (tracing .apply (map ));
293331 }
294332
295333 private <X , Y > Mono <Y > doRequestResponse (
@@ -303,27 +341,31 @@ private <X, Y> Mono<Y> doRequestResponse(
303341 Function <? super Publisher <Y >, ? extends Publisher <Y >> metrics ,
304342 Function <Map <String , String >, Function <? super Publisher <Y >, ? extends Publisher <Y >>>
305343 tracing ) {
306- try {
307- HashMap <String , String > map = new HashMap <>();
308- ByteBuf d = marshaller .apply (o );
309- ByteBuf t = Tracing .mapToByteBuf (ByteBufAllocator .DEFAULT , map );
310- ByteBuf m = Metadata .encode (ByteBufAllocator .DEFAULT , service , route , t , metadata );
311-
312- Payload payload = ByteBufPayload .create (d , m );
313- return r .requestResponse (payload )
314- .map (
315- p -> {
316- try {
317- return unmarshaller .apply (p .sliceData ());
318- } finally {
319- p .release ();
320- }
321- })
322- .transform (metrics )
323- .transform (tracing .apply (map ));
324- } catch (Throwable t ) {
325- return Mono .error (t );
326- }
344+ final HashMap <String , String > map = new HashMap <>();
345+ return Mono .defer (
346+ () -> {
347+ try {
348+ ByteBuf d = marshaller .apply (o );
349+ ByteBuf m =
350+ metadataEncoder .encode (metadata , new SimpleSpanContext (map ), service , route );
351+ metadata .release ();
352+ Payload payload = ByteBufPayload .create (d , m );
353+ return r .requestResponse (payload );
354+ } catch (Throwable t ) {
355+ metadata .release ();
356+ return Mono .error (t );
357+ }
358+ })
359+ .map (
360+ p -> {
361+ try {
362+ return unmarshaller .apply (p .sliceData ());
363+ } finally {
364+ p .release ();
365+ }
366+ })
367+ .transform (metrics )
368+ .transform (tracing .apply (map ));
327369 }
328370
329371 private <X , Y > Flux <Y > doRequestStream (
@@ -337,27 +379,31 @@ private <X, Y> Flux<Y> doRequestStream(
337379 Function <? super Publisher <Y >, ? extends Publisher <Y >> metrics ,
338380 Function <Map <String , String >, Function <? super Publisher <Y >, ? extends Publisher <Y >>>
339381 tracing ) {
340- try {
341- HashMap <String , String > map = new HashMap <>();
342- ByteBuf d = marshaller .apply (o );
343- ByteBuf t = Tracing .mapToByteBuf (ByteBufAllocator .DEFAULT , map );
344- ByteBuf m = Metadata .encode (ByteBufAllocator .DEFAULT , service , route , t , metadata );
345-
346- Payload payload = ByteBufPayload .create (d , m );
347- return r .requestStream (payload )
348- .map (
349- p -> {
350- try {
351- return unmarshaller .apply (p .sliceData ());
352- } finally {
353- p .release ();
354- }
355- })
356- .transform (metrics )
357- .transform (tracing .apply (map ));
358- } catch (Throwable t ) {
359- return Flux .error (t );
360- }
382+ final HashMap <String , String > map = new HashMap <>();
383+ return Flux .defer (
384+ () -> {
385+ try {
386+ ByteBuf d = marshaller .apply (o );
387+ ByteBuf m =
388+ metadataEncoder .encode (metadata , new SimpleSpanContext (map ), service , route );
389+ metadata .release ();
390+ Payload payload = ByteBufPayload .create (d , m );
391+ return r .requestStream (payload );
392+ } catch (Throwable t ) {
393+ metadata .release ();
394+ return Flux .error (t );
395+ }
396+ })
397+ .map (
398+ p -> {
399+ try {
400+ return unmarshaller .apply (p .sliceData ());
401+ } finally {
402+ p .release ();
403+ }
404+ })
405+ .transform (metrics )
406+ .transform (tracing .apply (map ));
361407 }
362408
363409 private <X , Y > Flux <Y > doRequestChannel (
@@ -373,18 +419,28 @@ private <X, Y> Flux<Y> doRequestChannel(
373419 tracing ) {
374420 try {
375421
376- HashMap <String , String > map = new HashMap <>();
422+ final HashMap <String , String > map = new HashMap <>();
377423
378424 Flux <Payload > input =
379425 Flux .from (pub )
380426 .map (
381- o -> {
382- ByteBuf d = marshaller .apply (o );
383- ByteBuf t = Tracing .mapToByteBuf (ByteBufAllocator .DEFAULT , map );
384- ByteBuf m =
385- Metadata .encode (ByteBufAllocator .DEFAULT , service , route , t , metadata );
386-
387- return ByteBufPayload .create (d , m );
427+ new Function <X , Payload >() {
428+ boolean first = true ;
429+
430+ @ Override
431+ public Payload apply (X o ) {
432+ ByteBuf d = marshaller .apply (o );
433+ if (first ) {
434+ first = false ;
435+ ByteBuf m =
436+ metadataEncoder .encode (
437+ metadata , new SimpleSpanContext (map ), service , route );
438+ metadata .release ();
439+ return ByteBufPayload .create (d , m );
440+ }
441+
442+ return ByteBufPayload .create (d );
443+ }
388444 });
389445
390446 return r .requestChannel (input )
0 commit comments