Skip to content

Commit a98509b

Browse files
authored
Merge pull request #29 from JavaSaBr/feature-broker-25
[broker-25] Update publish re-try mechanism
2 parents 1464ea5 + 13e7e6a commit a98509b

File tree

109 files changed

+2076
-704
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+2076
-704
lines changed

build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.8.0"
35+
rlibVersion = "9.9.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'
@@ -43,15 +43,17 @@ allprojects {
4343
projectReactorVersion = "3.3.0.RELEASE"
4444
byteBuddyVersion = "1.10.2"
4545
objenesisVersion = "3.1"
46+
gsonVersion = "2.8.6"
4647
}
4748

4849
dependencies {
4950

5051
implementation "com.spaceshift:rlib.network:$rlibVersion"
5152
implementation "com.spaceshift:rlib.logger.slf4j:$rlibVersion"
5253
implementation "org.springframework.boot:spring-boot-starter:$springbootVersion"
53-
implementation "org.springframework.boot:spring-boot-starter-logging:$springbootVersion"
54+
implementation "org.springframework.boot:spring-boot-starter-log4j2:$springbootVersion"
5455
implementation "io.projectreactor:reactor-core:$projectReactorVersion"
56+
implementation "com.google.code.gson:gson:$gsonVersion"
5557

5658
compileOnly "org.jetbrains:annotations:$annotationVersion"
5759
compileOnly "org.projectlombok:lombok:$lombokVersion"
@@ -122,4 +124,5 @@ wrapper {
122124

123125
configurations.each {
124126
it.exclude group: "org.slf4j", module: "slf4j-log4j12"
127+
it.exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
125128
}
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
package com.ss.mqtt.broker;
22

33
import com.ss.mqtt.broker.config.MqttBrokerConfig;
4-
import com.ss.rlib.common.concurrent.util.ConcurrentUtils;
54
import lombok.RequiredArgsConstructor;
65
import org.jetbrains.annotations.NotNull;
76
import org.springframework.boot.SpringApplication;
87
import org.springframework.context.annotation.Configuration;
98
import org.springframework.context.annotation.Import;
109

11-
@Import({
12-
MqttBrokerConfig.class
13-
})
1410
@Configuration
1511
@RequiredArgsConstructor
12+
@Import(MqttBrokerConfig.class)
1613
public class MqttBrokerApplication {
1714

1815
public static void main(@NotNull String[] args) {
19-
ConcurrentUtils.wait(SpringApplication.run(MqttBrokerApplication.class, args));
16+
SpringApplication.run(MqttBrokerApplication.class, args);
2017
}
2118
}

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private interface ChannelFactory extends
6262
return new InMemoryClientIdRegistry(
6363
env.getProperty(
6464
"client.id.available.chars",
65-
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-"
65+
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
6666
),
6767
env.getProperty("client.id.max.length", int.class, 36)
6868
);
@@ -94,16 +94,14 @@ private interface ChannelFactory extends
9494
@NotNull ClientIdRegistry clientIdRegistry,
9595
@NotNull SubscriptionService subscriptionService,
9696
@NotNull PublishingService publishingService,
97-
@NotNull MqttSessionService mqttSessionService,
98-
@NotNull PublishRetryService publishRetryService
97+
@NotNull MqttSessionService mqttSessionService
9998
) {
10099

101100
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
102101
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
103102
clientIdRegistry,
104103
authenticationService,
105104
mqttSessionService,
106-
publishRetryService,
107105
subscriptionService
108106
);
109107
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
@@ -122,13 +120,11 @@ private interface ChannelFactory extends
122120
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
123121
@NotNull ClientIdRegistry clientIdRegistry,
124122
@NotNull MqttSessionService mqttSessionService,
125-
@NotNull PublishRetryService publishRetryService,
126123
@NotNull SubscriptionService subscriptionService
127124
) {
128125
return new DefaultMqttClientReleaseHandler(
129126
clientIdRegistry,
130127
mqttSessionService,
131-
publishRetryService,
132128
subscriptionService
133129
);
134130
}
@@ -152,14 +148,6 @@ private interface ChannelFactory extends
152148
);
153149
}
154150

155-
@Bean
156-
@NotNull PublishRetryService publishRetryService() {
157-
return new DefaultPublishRetryService(
158-
env.getProperty("publish.pending.check.interval", int.class, 60 * 1000),
159-
env.getProperty("publish.retry.interval", int.class, 60 * 1000)
160-
);
161-
}
162-
163151
@Bean
164152
@NotNull InetSocketAddress deviceNetworkAddress(
165153
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
2626
@NotNull byte[] authenticationData,
2727
@NotNull Array<StringPair> userProperties
2828
) {
29-
return new ConnectAck311OutPacket(client, reasonCode, sessionPresent);
29+
return new ConnectAck311OutPacket(reasonCode, sessionPresent);
3030
}
3131

3232
@Override
3333
public @NotNull PublishOutPacket newPublish(
34-
@NotNull MqttClient client,
3534
int packetId,
3635
@NotNull QoS qos,
3736
boolean retained,
@@ -44,40 +43,44 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
4443
@NotNull byte[] correlationData,
4544
@NotNull Array<StringPair> userProperties
4645
) {
47-
return newPublish(client, packetId, qos, retained, duplicate, topicName, payload);
46+
return new Publish311OutPacket(
47+
packetId,
48+
qos,
49+
retained,
50+
duplicate,
51+
topicName,
52+
payload
53+
);
4854
}
4955

5056
@Override
5157
public @NotNull MqttWritablePacket newPublishAck(
52-
@NotNull MqttClient client,
5358
int packetId,
5459
@NotNull PublishAckReasonCode reasonCode,
5560
@NotNull String reason,
5661
@NotNull Array<StringPair> userProperties
5762
) {
58-
return new PublishAck311OutPacket(client, packetId);
63+
return new PublishAck311OutPacket(packetId);
5964
}
6065

6166
@Override
6267
public @NotNull MqttWritablePacket newSubscribeAck(
63-
@NotNull MqttClient client,
6468
int packetId,
6569
@NotNull Array<SubscribeAckReasonCode> reasonCodes,
6670
@NotNull String reason,
6771
@NotNull Array<StringPair> userProperties
6872
) {
69-
return new SubscribeAck311OutPacket(client, packetId, reasonCodes);
73+
return new SubscribeAck311OutPacket(reasonCodes, packetId);
7074
}
7175

7276
@Override
7377
public @NotNull MqttWritablePacket newUnsubscribeAck(
74-
@NotNull MqttClient client,
7578
int packetId,
7679
@NotNull Array<UnsubscribeAckReasonCode> reasonCodes,
7780
@NotNull Array<StringPair> userProperties,
7881
@NotNull String reason
7982
) {
80-
return new UnsubscribeAck311OutPacket(client, packetId);
83+
return new UnsubscribeAck311OutPacket(packetId);
8184
}
8285

8386
@Override
@@ -88,12 +91,11 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
8891
@NotNull String reason,
8992
@NotNull String serverReference
9093
) {
91-
return new Disconnect311OutPacket(client);
94+
return new Disconnect311OutPacket();
9295
}
9396

9497
@Override
9598
public @NotNull MqttWritablePacket newAuthenticate(
96-
@NotNull MqttClient client,
9799
@NotNull AuthenticateReasonCode reasonCode,
98100
@NotNull String authenticateMethod,
99101
@NotNull byte[] authenticateData,
@@ -104,45 +106,42 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
104106
}
105107

106108
@Override
107-
public @NotNull MqttWritablePacket newPingRequest(@NotNull MqttClient client) {
108-
return new PingRequest311OutPacket(client);
109+
public @NotNull MqttWritablePacket newPingRequest() {
110+
return new PingRequest311OutPacket();
109111
}
110112

111113
@Override
112-
public @NotNull MqttWritablePacket newPingResponse(@NotNull MqttClient client) {
113-
return new PingResponse311OutPacket(client);
114+
public @NotNull MqttWritablePacket newPingResponse() {
115+
return new PingResponse311OutPacket();
114116
}
115117

116118
@Override
117119
public @NotNull MqttWritablePacket newPublishRelease(
118-
@NotNull MqttClient client,
119120
int packetId,
120121
@NotNull PublishReleaseReasonCode reasonCode,
121122
@NotNull Array<StringPair> userProperties,
122123
@NotNull String reason
123124
) {
124-
return new PublishRelease311OutPacket(client, packetId);
125+
return new PublishRelease311OutPacket(packetId);
125126
}
126127

127128
@Override
128129
public @NotNull MqttWritablePacket newPublishReceived(
129-
@NotNull MqttClient client,
130130
int packetId,
131131
@NotNull PublishReceivedReasonCode reasonCode,
132132
@NotNull Array<StringPair> userProperties,
133133
@NotNull String reason
134134
) {
135-
return new PublishReceived311OutPacket(client, packetId);
135+
return new PublishReceived311OutPacket(packetId);
136136
}
137137

138138
@Override
139139
public @NotNull MqttWritablePacket newPublishCompleted(
140-
@NotNull MqttClient client,
141140
int packetId,
142141
@NotNull PublishCompletedReasonCode reasonCode,
143142
@NotNull Array<StringPair> userProperties,
144143
@NotNull String reason
145144
) {
146-
return new PublishComplete311OutPacket(client, packetId);
145+
return new PublishComplete311OutPacket(packetId);
147146
}
148147
}

0 commit comments

Comments
 (0)