Skip to content

Commit 7bfbd70

Browse files
committed
[broker-14] work on code review
1 parent 3a595c0 commit 7bfbd70

18 files changed

+111
-98
lines changed

build.gradle

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ allprojects {
9494
}
9595

9696
tasks.withType(GroovyCompile) {
97-
options.forkOptions.jvmArgs << "--enable-preview"
98-
}
99-
100-
tasks.withType(JavaCompile) {
101-
options.compilerArgs.add("--enable-preview")
97+
options.forkOptions.jvmArgs += "--enable-preview"
10298
}
10399

104100
processResources {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
1414
import com.ss.mqtt.broker.model.MqttPropertyConstants;
1515
import com.ss.mqtt.broker.model.QoS;
16-
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
1716
import com.ss.mqtt.broker.network.MqttConnection;
1817
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
1918
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
@@ -69,11 +68,6 @@ private interface ChannelFactory extends
6968
);
7069
}
7170

72-
@Bean
73-
@NotNull TopicSubscribers topicSubscribers() {
74-
return new TopicSubscribers();
75-
}
76-
7771
@Bean
7872
@NotNull MqttSessionService mqttSessionService() {
7973
return new InMemoryMqttSessionService(
@@ -101,8 +95,7 @@ private interface ChannelFactory extends
10195
@NotNull SubscriptionService subscriptionService,
10296
@NotNull PublishingService publishingService,
10397
@NotNull MqttSessionService mqttSessionService,
104-
@NotNull PublishRetryService publishRetryService,
105-
@NotNull TopicSubscribers topicSubscribers
98+
@NotNull PublishRetryService publishRetryService
10699
) {
107100

108101
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
@@ -111,7 +104,7 @@ private interface ChannelFactory extends
111104
authenticationService,
112105
mqttSessionService,
113106
publishRetryService,
114-
topicSubscribers
107+
subscriptionService
115108
);
116109
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
117110
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -127,13 +120,13 @@ private interface ChannelFactory extends
127120
@NotNull ClientIdRegistry clientIdRegistry,
128121
@NotNull MqttSessionService mqttSessionService,
129122
@NotNull PublishRetryService publishRetryService,
130-
@NotNull TopicSubscribers topicSubscribers
123+
@NotNull SubscriptionService subscriptionService
131124
) {
132125
return new DefaultMqttClientReleaseHandler(
133126
clientIdRegistry,
134127
mqttSessionService,
135128
publishRetryService,
136-
topicSubscribers
129+
subscriptionService
137130
);
138131
}
139132

@@ -179,8 +172,8 @@ private interface ChannelFactory extends
179172
}
180173

181174
@Bean
182-
@NotNull SubscriptionService subscriptionService(@NotNull TopicSubscribers topicSubscribers) {
183-
return new SimpleSubscriptionService(topicSubscribers);
175+
@NotNull SubscriptionService subscriptionService() {
176+
return new SimpleSubscriptionService();
184177
}
185178

186179
@Bean

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package com.ss.mqtt.broker.handler.client;
22

3-
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
43
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
54
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
65
import com.ss.mqtt.broker.service.ClientIdRegistry;
76
import com.ss.mqtt.broker.service.MqttSessionService;
87
import com.ss.mqtt.broker.service.PublishRetryService;
8+
import com.ss.mqtt.broker.service.SubscriptionService;
99
import com.ss.rlib.common.util.StringUtils;
1010
import lombok.RequiredArgsConstructor;
1111
import lombok.extern.log4j.Log4j2;
@@ -20,7 +20,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
2020
private final @NotNull ClientIdRegistry clientIdRegistry;
2121
private final @NotNull MqttSessionService sessionService;
2222
private final @NotNull PublishRetryService publishRetryService;
23-
private final @NotNull TopicSubscribers topicSubscribers;
23+
private final @NotNull SubscriptionService subscriptionService;
2424

2525
@Override
2626
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -44,7 +44,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
4444
Mono<?> asyncActions = null;
4545

4646
if (session != null) {
47-
topicSubscribers.cleanSubscribers(client, session.getTopicFilters());
47+
subscriptionService.cleanSubscriptions(client, session);
4848
if (client.getConnectionConfig().isSessionsEnabled()) {
4949
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
5050
client.setSession(null);
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.ss.mqtt.broker.handler.client;
22

3-
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
43
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
54
import com.ss.mqtt.broker.service.ClientIdRegistry;
65
import com.ss.mqtt.broker.service.MqttSessionService;
76
import com.ss.mqtt.broker.service.PublishRetryService;
7+
import com.ss.mqtt.broker.service.SubscriptionService;
88
import org.jetbrains.annotations.NotNull;
99

1010
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
@@ -13,8 +13,8 @@ public DefaultMqttClientReleaseHandler(
1313
@NotNull ClientIdRegistry clientIdRegistry,
1414
@NotNull MqttSessionService sessionService,
1515
@NotNull PublishRetryService publishRetryService,
16-
@NotNull TopicSubscribers topicSubscribers
16+
@NotNull SubscriptionService subscriptionService
1717
) {
18-
super(clientIdRegistry, sessionService, publishRetryService, topicSubscribers);
18+
super(clientIdRegistry, sessionService, publishRetryService, subscriptionService);
1919
}
2020
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package com.ss.mqtt.broker.handler.packet.in;
22

3+
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
34
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
45
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
5-
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
66
import static com.ss.mqtt.broker.util.ReactorUtils.ifTrue;
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
9-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
109
import com.ss.mqtt.broker.model.MqttSession;
11-
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
10+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1211
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1312
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
1413
import com.ss.mqtt.broker.service.*;
@@ -24,7 +23,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
2423
private final @NotNull AuthenticationService authenticationService;
2524
private final @NotNull MqttSessionService mqttSessionService;
2625
private final @NotNull PublishRetryService publishRetryService;
27-
private final @NotNull TopicSubscribers topicSubscribers;
26+
private final @NotNull SubscriptionService subscriptionService;
2827

2928
@Override
3029
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -138,7 +137,7 @@ private Mono<Boolean> onConnected(
138137

139138
publishRetryService.register(client);
140139

141-
topicSubscribers.restoreSubscribers(client, session.getTopicFilters());
140+
subscriptionService.restoreSubscriptions(client, session);
142141

143142
return Mono.just(Boolean.TRUE);
144143
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet)
2222
packet,
2323
this::publish
2424
);
25-
handleImpl(client, packet, result);
25+
handleResult(client, packet, result);
2626
}
2727

2828
private boolean publish(
@@ -36,7 +36,7 @@ private boolean publish(
3636
return publishOutHandlers[qos.ordinal()];
3737
}
3838

39-
protected void handleImpl(
39+
protected void handleResult(
4040
@NotNull MqttClient client,
4141
@NotNull PublishInPacket packet,
4242
@NotNull ActionResult result

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ public Qos1PublishInHandler(
1818
}
1919

2020
@Override
21-
protected void handleImpl(
21+
protected void handleResult(
2222
@NotNull MqttClient client,
2323
@NotNull PublishInPacket packet,
2424
@NotNull ActionResult result
2525
) {
26-
PublishAckReasonCode reasonCode = switch (result) {
26+
var reasonCode = switch (result) {
2727
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
2828
case SUCCESS -> PublishAckReasonCode.SUCCESS;
2929
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.ss.mqtt.broker.model;
22

3+
import com.ss.mqtt.broker.model.topic.TopicFilter;
34
import com.ss.mqtt.broker.network.client.MqttClient;
45
import com.ss.mqtt.broker.network.packet.HasPacketId;
56
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
6-
import com.ss.rlib.common.util.array.ConcurrentArray;
7+
import com.ss.rlib.common.function.NotNullTripleConsumer;
78
import org.jetbrains.annotations.NotNull;
89

910
public interface MqttSession {
@@ -41,5 +42,12 @@ interface PendingPacketHandler {
4142

4243
void registerPendingPublish(@NotNull PublishInPacket publish, @NotNull PendingPacketHandler handler, int packetId);
4344
void updatePendingPacket(@NotNull MqttClient client, @NotNull HasPacketId response);
44-
@NotNull ConcurrentArray<SubscribeTopicFilter> getTopicFilters();
45+
46+
<F, S> void forEachTopicFilter(
47+
@NotNull F first,
48+
@NotNull S second,
49+
@NotNull NotNullTripleConsumer<F, S, SubscribeTopicFilter> consumer
50+
);
51+
void addSubscriber(@NotNull SubscribeTopicFilter subscribe);
52+
void removeSubscriber(@NotNull TopicFilter subscribe);
4553
}

src/main/java/com/ss/mqtt/broker/model/Subscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ public Subscriber(@NotNull MqttClient mqttClient, @NotNull SubscribeTopicFilter
2525
this.subscribeTopicFilter = topicFilter;
2626
}
2727

28-
public QoS getQos() {
28+
public @NotNull QoS getQos() {
2929
return subscribeTopicFilter.getQos();
3030
}
3131

32-
public TopicFilter getTopicFilter() {
32+
public @NotNull TopicFilter getTopicFilter() {
3333
return subscribeTopicFilter.getTopicFilter();
3434
}
3535

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import com.ss.mqtt.broker.model.MqttPropertyConstants;
44
import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession;
55
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
6+
import com.ss.mqtt.broker.model.topic.TopicFilter;
67
import com.ss.mqtt.broker.network.client.MqttClient;
78
import com.ss.mqtt.broker.network.packet.HasPacketId;
89
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
10+
import com.ss.rlib.common.function.NotNullTripleConsumer;
911
import com.ss.rlib.common.util.ClassUtils;
1012
import com.ss.rlib.common.util.array.Array;
1113
import com.ss.rlib.common.util.array.ConcurrentArray;
@@ -63,7 +65,7 @@ private static void removeExpiredPackets(@NotNull Array<PendingPublish> publishe
6365
private final @NotNull String clientId;
6466
private final @NotNull ConcurrentArray<PendingPublish> pendingPublishes;
6567
private final @NotNull AtomicInteger packetIdGenerator;
66-
private final @Getter @NotNull ConcurrentArray<SubscribeTopicFilter> topicFilters;
68+
private final @NotNull ConcurrentArray<SubscribeTopicFilter> topicFilters;
6769

6870
private volatile @Getter @Setter long expirationTime = -1;
6971

@@ -165,6 +167,29 @@ public void updatePendingPacket(
165167
}
166168
}
167169

170+
@Override
171+
public <F, S> void forEachTopicFilter(
172+
@NotNull F first,
173+
@NotNull S second,
174+
@NotNull NotNullTripleConsumer<F, S, SubscribeTopicFilter> consumer
175+
) {
176+
topicFilters.forEachInReadLock(first, second, consumer);
177+
}
178+
179+
@Override
180+
public void addSubscriber(@NotNull SubscribeTopicFilter subscribe) {
181+
topicFilters.runInWriteLock(subscribe, Collection::add);
182+
}
183+
184+
@Override
185+
public void removeSubscriber(@NotNull TopicFilter topicFilter) {
186+
topicFilters.removeIfConvertedInWriteLock(
187+
topicFilter,
188+
SubscribeTopicFilter::getTopicFilter,
189+
Object::equals
190+
);
191+
}
192+
168193
@Override
169194
public void clear() {
170195
pendingPublishes.runInWriteLock(Collection::clear);

0 commit comments

Comments
 (0)