Skip to content

Commit 8fd09f5

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #82 from reactive-commons/feature/spring-boot-3
Feature/spring boot 3
2 parents fd0c01a + 589b3de commit 8fd09f5

File tree

20 files changed

+102
-75
lines changed

20 files changed

+102
-75
lines changed

.github/workflows/main.yml

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ on:
1515
jobs:
1616
build:
1717
if: github.event_name != 'release'
18-
runs-on: ubuntu-18.04
18+
runs-on: ubuntu-latest
1919
steps:
20-
- uses: actions/checkout@v2
20+
- uses: actions/checkout@v3
2121
with:
2222
fetch-depth: 0
23-
- name: Set up JDK 11
24-
uses: actions/setup-java@v1
23+
- name: Set up JDK 17
24+
uses: actions/setup-java@v3
2525
with:
26-
java-version: 11
26+
distribution: temurin
27+
java-version: 17
2728
- name: Grant execute permission for gradlew
2829
run: chmod +x gradlew
2930
- name: Execute build test jacocoTestReport and sonar analysis
@@ -41,13 +42,14 @@ jobs:
4142

4243
release:
4344
if: github.event_name == 'release'
44-
runs-on: ubuntu-18.04
45+
runs-on: ubuntu-latest
4546
steps:
46-
- uses: actions/checkout@v2
47-
- name: Set up JDK 1.8
48-
uses: actions/setup-java@v1
47+
- uses: actions/checkout@v3
48+
- name: Set up JDK 17
49+
uses: actions/setup-java@v3
4950
with:
50-
java-version: 1.8
51+
distribution: temurin
52+
java-version: 17
5153
- name: Grant execute permission for gradlew
5254
run: chmod +x gradlew
5355
# - name: Execute jacocoTestReport
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
dependencies {
2-
compile project(":async-commons-rabbit-starter")
3-
compile('org.springframework.boot:spring-boot-starter')
2+
implementation project(':async-commons-rabbit-starter')
3+
implementation 'org.springframework.boot:spring-boot-starter'
44
}
55

66
test.onlyIf { false }

async/async-commons/async-commons.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dependencies {
88
api project(':domain-events-api')
99

1010
compileOnly 'io.projectreactor:reactor-core'
11-
api 'io.projectreactor.rabbitmq:reactor-rabbitmq'
11+
api "io.projectreactor.rabbitmq:reactor-rabbitmq:${reactorRabbitVersion}"
1212
api 'com.fasterxml.jackson.core:jackson-databind'
1313
implementation 'commons-io:commons-io:2.11.0'
1414

async/async-commons/src/main/java/org/reactivecommons/async/commons/reply/ReactiveReplyRouter.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,34 @@
22

33
import org.reactivecommons.async.commons.communications.Message;
44
import reactor.core.publisher.Mono;
5-
import reactor.core.publisher.UnicastProcessor;
6-
import reactor.util.concurrent.Queues;
5+
import reactor.core.publisher.Sinks;
76

87
import java.util.concurrent.ConcurrentHashMap;
98

109
public class ReactiveReplyRouter {
11-
private final ConcurrentHashMap<String, UnicastProcessor<Message>> processors = new ConcurrentHashMap<>();
10+
private final ConcurrentHashMap<String, Sinks.One<Message>> processors = new ConcurrentHashMap<>();
1211

1312
public Mono<Message> register(String correlationID) {
14-
final UnicastProcessor<Message> processor = UnicastProcessor.create(Queues.<Message>one().get());
13+
final Sinks.One<Message> processor = Sinks.one();
1514
processors.put(correlationID, processor);
16-
return processor.singleOrEmpty();
15+
return processor.asMono();
1716
}
1817

1918
public void routeReply(String correlationID, Message data) {
20-
final UnicastProcessor<Message> processor = processors.remove(correlationID);
19+
final Sinks.One<Message> processor = processors.remove(correlationID);
2120
if (processor != null) {
22-
processor.onNext(data);
23-
processor.onComplete();
21+
processor.tryEmitValue(data);
2422
}
2523
}
2624

27-
public void deregister(String correlationID){
25+
public void deregister(String correlationID) {
2826
processors.remove(correlationID);
2927
}
3028

3129
public void routeEmpty(String correlationID) {
32-
final UnicastProcessor<Message> processor = processors.remove(correlationID);
30+
final Sinks.One<Message> processor = processors.remove(correlationID);
3331
if (processor != null) {
34-
processor.onComplete();
32+
processor.tryEmitEmpty();
3533
}
3634
}
3735
}

async/async-commons/src/test/java/org/reactivecommons/async/commons/reply/ReactiveReplyRouterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
class ReactiveReplyRouterTest {
1313

14-
private ReactiveReplyRouter replyRouter = new ReactiveReplyRouter();
14+
private final ReactiveReplyRouter replyRouter = new ReactiveReplyRouter();
1515

1616
@Test
1717
void shouldRouteReply(){
@@ -50,4 +50,4 @@ void shouldDeRegisterProcessor(){
5050
.expectTimeout(Duration.ofSeconds(3)).verify();
5151
}
5252

53-
}
53+
}

async/async-rabbit-standalone/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.rabbit.config;
22

3+
import io.micrometer.core.instrument.MeterRegistry;
34
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
45
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
56
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
@@ -22,8 +23,9 @@ public DirectAsyncGatewayConfig(String directMessagesExchangeName, String appNam
2223
this.appName = appName;
2324
}
2425

25-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter) throws Exception {
26-
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter);
26+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter,
27+
MeterRegistry meterRegistry) throws Exception {
28+
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter, meterRegistry);
2729
}
2830

2931
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package org.reactivecommons.async.rabbit.config;
22

3+
import io.micrometer.core.instrument.MeterRegistry;
4+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
35
import lombok.RequiredArgsConstructor;
6+
import org.reactivecommons.async.commons.config.BrokerConfig;
7+
import org.reactivecommons.async.commons.converters.MessageConverter;
8+
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
49
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
510
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
611
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
712
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
8-
import org.reactivecommons.async.commons.converters.MessageConverter;
913
import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener;
10-
import org.reactivecommons.async.commons.config.BrokerConfig;
11-
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
14+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1215
import org.springframework.context.annotation.Bean;
1316
import org.springframework.context.annotation.Configuration;
1417
import org.springframework.context.annotation.Import;
@@ -21,22 +24,25 @@ public class DirectAsyncGatewayConfig {
2124
private final BrokerConfigProps props;
2225

2326
@Bean
24-
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter) throws Exception {
25-
return new RabbitDirectAsyncGateway(config, router, rSender, props.getDirectMessagesExchangeName(), converter);
27+
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter, MeterRegistry meterRegistry) throws Exception {
28+
return new RabbitDirectAsyncGateway(config, router, rSender, props.getDirectMessagesExchangeName(), converter, meterRegistry);
2629
}
2730

2831
@Bean
29-
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
32+
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
3033
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, props.getReplyQueue());
3134
replyListener.startListening(config.getRoutingKey());
3235
return replyListener;
3336
}
3437

35-
3638
@Bean
3739
public ReactiveReplyRouter router() {
3840
return new ReactiveReplyRouter();
3941
}
4042

41-
43+
@Bean
44+
@ConditionalOnMissingBean(MeterRegistry.class)
45+
public MeterRegistry defaultRabbitMeterRegistry() {
46+
return new SimpleMeterRegistry();
47+
}
4248
}

async/async-rabbit/async-rabbit.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ ext {
44
}
55

66
dependencies {
7-
api project(":async-commons-api")
8-
api project(":domain-events-api")
9-
api project(":async-commons")
7+
api project(':async-commons-api')
8+
api project(':domain-events-api')
9+
api project(':async-commons')
1010

1111
api 'io.projectreactor:reactor-core'
12+
api 'io.projectreactor:reactor-core-micrometer'
1213
api 'io.projectreactor.rabbitmq:reactor-rabbitmq'
1314
api 'com.rabbitmq:amqp-client'
1415
api 'com.fasterxml.jackson.core:jackson-databind'

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.rabbit;
22

3+
import io.micrometer.core.instrument.MeterRegistry;
34
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.async.api.AsyncQuery;
56
import org.reactivecommons.async.api.DirectAsyncGateway;
@@ -8,6 +9,7 @@
89
import org.reactivecommons.async.commons.converters.MessageConverter;
910
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
1011
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
12+
import reactor.core.observability.micrometer.Micrometer;
1113
import reactor.core.publisher.Flux;
1214
import reactor.core.publisher.Mono;
1315
import reactor.rabbitmq.OutboundMessageResult;
@@ -33,10 +35,11 @@ public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
3335
private final boolean persistentCommands;
3436
private final boolean persistentQueries;
3537
private final Duration replyTimeout;
38+
private final MeterRegistry meterRegistry;
3639

3740

3841
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender,
39-
String exchange, MessageConverter converter) {
42+
String exchange, MessageConverter converter, MeterRegistry meterRegistry) {
4043
this.config = config;
4144
this.router = router;
4245
this.sender = sender;
@@ -45,6 +48,7 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,
4548
this.persistentCommands = config.isPersistentCommands();
4649
this.persistentQueries = config.isPersistentQueries();
4750
this.replyTimeout = config.getReplyTimeout();
51+
this.meterRegistry = meterRegistry;
4852
}
4953

5054
@Override
@@ -77,7 +81,7 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
7781
.name("async_query")
7882
.tag("operation", query.getResource())
7983
.tag("target", targetName)
80-
.metrics();
84+
.tap(Micrometer.metrics(meterRegistry));
8185
}
8286

8387
@Override

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.micrometer.core.instrument.MeterRegistry;
6+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
57
import lombok.Data;
68
import org.junit.jupiter.api.Test;
79
import org.junit.jupiter.api.extension.ExtendWith;
@@ -12,22 +14,20 @@
1214
import org.reactivecommons.async.api.AsyncQuery;
1315
import org.reactivecommons.async.api.From;
1416
import org.reactivecommons.async.commons.communications.Message;
15-
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
16-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1717
import org.reactivecommons.async.commons.config.BrokerConfig;
1818
import org.reactivecommons.async.commons.converters.MessageConverter;
1919
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
20-
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
2120
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
21+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
22+
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
2223
import org.reactivestreams.Publisher;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
25-
import reactor.core.publisher.UnicastProcessor;
26+
import reactor.core.publisher.Sinks;
2627
import reactor.rabbitmq.OutboundMessage;
2728
import reactor.rabbitmq.OutboundMessageResult;
2829
import reactor.rabbitmq.Sender;
2930
import reactor.test.StepVerifier;
30-
import reactor.util.concurrent.Queues;
3131

3232
import java.time.Duration;
3333
import java.util.List;
@@ -40,9 +40,18 @@
4040
import java.util.stream.IntStream;
4141

4242
import static org.assertj.core.api.Assertions.assertThat;
43-
import static org.mockito.ArgumentMatchers.*;
44-
import static org.mockito.Mockito.*;
45-
import static org.reactivecommons.async.commons.Headers.*;
43+
import static org.mockito.ArgumentMatchers.any;
44+
import static org.mockito.ArgumentMatchers.anyBoolean;
45+
import static org.mockito.ArgumentMatchers.anyMap;
46+
import static org.mockito.ArgumentMatchers.anyString;
47+
import static org.mockito.ArgumentMatchers.eq;
48+
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.times;
50+
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.when;
52+
import static org.reactivecommons.async.commons.Headers.COMPLETION_ONLY_SIGNAL;
53+
import static org.reactivecommons.async.commons.Headers.CORRELATION_ID;
54+
import static org.reactivecommons.async.commons.Headers.REPLY_ID;
4655

4756

4857
@ExtendWith(MockitoExtension.class)
@@ -55,16 +64,18 @@ class RabbitDirectAsyncGatewayTest {
5564
private ReactiveReplyRouter router;
5665
@Mock
5766
private ReactiveMessageSender senderMock;
67+
68+
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
5869
private RabbitDirectAsyncGateway asyncGateway;
5970

6071
public void init(ReactiveMessageSender sender) {
61-
asyncGateway = new RabbitDirectAsyncGateway(config, router, sender, "exchange", converter);
72+
asyncGateway = new RabbitDirectAsyncGateway(config, router, sender, "exchange", converter, meterRegistry);
6273
}
6374

6475
@Test
6576
void shouldReleaseRouterResourcesOnTimeout() {
6677
BrokerConfig config = new BrokerConfig(false, false, false, Duration.ofSeconds(1));
67-
asyncGateway = new RabbitDirectAsyncGateway(config, router, senderMock, "ex", converter);
78+
asyncGateway = new RabbitDirectAsyncGateway(config, router, senderMock, "ex", converter, meterRegistry);
6879
when(router.register(anyString())).thenReturn(Mono.never());
6980
when(senderMock.sendNoConfirm(any(), anyString(), anyString(), anyMap(), anyBoolean()))
7081
.thenReturn(Mono.empty());
@@ -179,10 +190,9 @@ private void mockReply() throws JsonProcessingException {
179190
Message message = mock(Message.class);
180191
ObjectMapper mapper = new ObjectMapper();
181192
when(message.getBody()).thenReturn(mapper.writeValueAsString(new DummyMessage()).getBytes());
182-
final UnicastProcessor<Message> processor = UnicastProcessor.create(Queues.<Message>one().get());
183-
processor.onNext(message);
184-
processor.onComplete();
185-
when(router.register(anyString())).thenReturn(processor.singleOrEmpty());
193+
final Sinks.One<Message> processor = Sinks.one();
194+
processor.tryEmitValue(message);
195+
when(router.register(anyString())).thenReturn(processor.asMono());
186196
}
187197

188198
private ReactiveMessageSender getReactiveMessageSender() {

0 commit comments

Comments
 (0)