Skip to content

Commit 4fee898

Browse files
committed
[broker-14] work according code review
1 parent 11d6efe commit 4fee898

File tree

11 files changed

+120
-120
lines changed

11 files changed

+120
-120
lines changed

src/main/java/com/ss/mqtt/broker/model/QoS.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,4 @@ public enum QoS {
2424
}
2525

2626
private final SubscribeAckReasonCode subscribeAckReasonCode;
27-
28-
public QoS nextQos() {
29-
int nextLevel = ordinal() + 1;
30-
if (nextLevel < INVALID.ordinal()) {
31-
return VALUES[nextLevel];
32-
} else {
33-
return EXACTLY_ONCE_DELIVERY;
34-
}
35-
}
3627
}

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.ss.mqtt.broker.model.topic.TopicFilter;
44
import lombok.Getter;
55
import lombok.RequiredArgsConstructor;
6+
import org.jetbrains.annotations.NotNull;
67

78
@Getter
89
@RequiredArgsConstructor
@@ -11,20 +12,20 @@ public class SubscribeTopicFilter {
1112
/**
1213
* The subscriber's topic filter.
1314
*/
14-
private final TopicFilter topicFilter;
15+
private final @NotNull TopicFilter topicFilter;
1516

1617
/**
1718
* Maximum QoS field. This gives the maximum QoS level at which the Server
1819
* can send Application Messages to the Client.
1920
*/
20-
private final QoS qos;
21+
private final @NotNull QoS qos;
2122

2223
/**
2324
* This option specifies whether retained messages are sent when the subscription is established.
2425
* This does not affect the sending of retained messages at any point after the subscribe.
2526
* If there are no retained messages matching the Topic Filter, all of these values act the same.
2627
*/
27-
private final SubscribeRetainHandling retainHandling;
28+
private final @NotNull SubscribeRetainHandling retainHandling;
2829

2930
/**
3031
* If the value is true, Application Messages MUST NOT be forwarded to a connection with a ClientID equal
Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,53 @@
11
package com.ss.mqtt.broker.model.topic;
22

3+
import com.ss.rlib.common.util.ArrayUtils;
34
import com.ss.rlib.common.util.StringUtils;
5+
import com.ss.rlib.common.util.array.Array;
46
import lombok.Getter;
57
import org.jetbrains.annotations.NotNull;
68

79
@Getter
8-
abstract class AbstractTopic {
10+
public abstract class AbstractTopic {
911

1012
static final String DELIMITER = "/";
1113
static final String MULTI_LEVEL_WILDCARD = "#";
1214
static final String SINGLE_LEVEL_WILDCARD = "+";
1315

16+
static void checkTopic(@NotNull String topic) {
17+
if (topic.length() == 0) {
18+
throw new IllegalArgumentException("Topic has zero length.");
19+
} else if (topic.contains("//") || topic.startsWith("/") || topic.endsWith("/")) {
20+
throw new IllegalArgumentException("Topic has zero length level: " + topic);
21+
}
22+
}
23+
1424
private final int length;
15-
private final String[] levels;
16-
private final String string;
25+
private final String[] segments;
26+
private final String rawTopic;
1727

1828
AbstractTopic() {
1929
length = 0;
20-
levels = new String[0];
21-
string = StringUtils.EMPTY;
30+
segments = ArrayUtils.EMPTY_STRING_ARRAY;
31+
rawTopic = StringUtils.EMPTY;
2232
}
2333

2434
AbstractTopic(@NotNull String topicName) {
2535

2636
length = topicName.length();
27-
if (length == 0) {
28-
throw new IllegalArgumentException("Topic name has zero length.");
29-
}
30-
if (topicName.contains("//") || topicName.startsWith("/") || topicName.endsWith("/")) {
31-
throw new IllegalArgumentException("Topic name has zero length level: " + topicName);
32-
}
33-
34-
levels = topicName.split(DELIMITER);
35-
string = topicName;
37+
segments = topicName.split(DELIMITER);
38+
rawTopic = topicName;
3639
}
3740

3841
@Override
39-
public String toString() {
40-
return string;
41-
}
42-
43-
String getSegment(int level) {
44-
return levels[level];
42+
public @NotNull String toString() {
43+
return rawTopic;
4544
}
4645

47-
int size() {
48-
return levels.length;
46+
@NotNull String getSegment(int level) {
47+
return segments[level];
4948
}
5049

51-
int lastLevel() {
52-
return levels.length - 1;
50+
int levelsCount() {
51+
return segments.length;
5352
}
54-
5553
}

src/main/java/com/ss/mqtt/broker/model/topic/TopicFilter.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,19 @@
44

55
public class TopicFilter extends AbstractTopic {
66

7-
public TopicFilter(@NotNull String topicName) {
8-
super(topicName);
9-
10-
int multiPos = topicName.indexOf(MULTI_LEVEL_WILDCARD);
11-
if (multiPos != -1 && multiPos != topicName.length() - 1) {
12-
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicName);
13-
}
14-
if (topicName.contains("++")) {
15-
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicName);
7+
public static TopicFilter from(@NotNull String topicFilter) {
8+
checkTopic(topicFilter);
9+
int multiPos = topicFilter.indexOf(MULTI_LEVEL_WILDCARD);
10+
if (multiPos != -1 && multiPos != topicFilter.length() - 1) {
11+
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicFilter);
12+
} else if (topicFilter.contains("++")) {
13+
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicFilter);
1614
}
15+
return new TopicFilter(topicFilter);
1716
}
1817

19-
public boolean matches(@NotNull TopicName topicName) {
20-
21-
if ((size() < topicName.size() && !getSegment(lastLevel()).equals(MULTI_LEVEL_WILDCARD)) ||
22-
(topicName.size() < size() && !topicName.getSegment(lastLevel()).equals(MULTI_LEVEL_WILDCARD))) {
23-
return false;
24-
}
25-
26-
int maxLength = Math.min(size(), topicName.size());
27-
for (int level = 0; level < maxLength; level++) {
28-
if (!getSegment(level).equals(topicName.getSegment(level)) &&
29-
!getSegment(level).equals(MULTI_LEVEL_WILDCARD) &&
30-
!topicName.getSegment(level).equals(MULTI_LEVEL_WILDCARD) &&
31-
!(getSegment(level).equals(SINGLE_LEVEL_WILDCARD) ||
32-
topicName.getSegment(level).equals(SINGLE_LEVEL_WILDCARD))) {
33-
return false;
34-
}
35-
}
36-
return true;
18+
private TopicFilter(@NotNull String topicFilter) {
19+
super(topicFilter);
3720
}
3821
}
3922

src/main/java/com/ss/mqtt/broker/model/topic/TopicName.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
package com.ss.mqtt.broker.model.topic;
22

3+
import lombok.AccessLevel;
4+
import lombok.NoArgsConstructor;
35
import org.jetbrains.annotations.NotNull;
46

7+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
58
public class TopicName extends AbstractTopic {
69

710
public static final TopicName EMPTY_TOPIC_NAME = new TopicName();
811

9-
private TopicName() {
10-
}
11-
12-
public TopicName(@NotNull String topicName) {
13-
super(topicName);
14-
12+
public static @NotNull TopicName from(@NotNull String topicName) {
13+
checkTopic(topicName);
1514
if (topicName.contains(MULTI_LEVEL_WILDCARD)) {
1615
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicName);
17-
}
18-
if (topicName.contains(SINGLE_LEVEL_WILDCARD)) {
16+
} else if (topicName.contains(SINGLE_LEVEL_WILDCARD)) {
1917
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicName);
2018
}
19+
return new TopicName(topicName);
20+
}
21+
22+
private TopicName(@NotNull String topicName) {
23+
super(topicName);
2124
}
2225

2326
}

src/main/java/com/ss/mqtt/broker/model/topic/TopicSubscribers.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.ss.rlib.common.util.array.Array;
99
import com.ss.rlib.common.util.array.ConcurrentArray;
1010
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
11-
import com.ss.rlib.common.util.dictionary.DictionaryFactory;
1211
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
1312
import org.jetbrains.annotations.NotNull;
1413
import org.jetbrains.annotations.Nullable;
@@ -17,31 +16,39 @@ public class TopicSubscribers {
1716

1817
private final static NotNullSupplier<TopicSubscribers> TOPIC_SUBSCRIBER_SUPPLIER = TopicSubscribers::new;
1918

20-
private static boolean filterByQos(@NotNull Array<Subscriber> subscribers, @NotNull Subscriber candidate) {
21-
var existed = subscribers.findAny(candidate, Subscriber::equals);
22-
if (existed == null) {
19+
private static boolean removeDuplicateWithLowerQoS(@NotNull Array<Subscriber> subscribers, @NotNull Subscriber candidate) {
20+
var found = subscribers.indexOf(candidate);
21+
if (found == -1) {
2322
return true;
2423
}
24+
var existed = subscribers.get(found);
2525
if (existed.getQos().ordinal() < candidate.getQos().ordinal()) {
26-
return subscribers.remove(existed);
26+
subscribers.fastRemove(found);
27+
return true;
2728
} else {
2829
return false;
2930
}
3031
}
3132

32-
private static TopicSubscribers collectSubscribers(
33+
private static @NotNull TopicSubscribers collectSubscribers(
3334
@NotNull ConcurrentObjectDictionary<String, TopicSubscribers> topicSubscribers,
3435
@NotNull String topicName,
3536
@NotNull Array<Subscriber> resultSubscribers
3637
) {
3738
var ts = topicSubscribers.get(topicName);
38-
if (ts != null) {
39-
long stamp = ts.getSubscribers().readLock();
40-
try {
41-
ts.getSubscribers().forEachFiltered(resultSubscribers, TopicSubscribers::filterByQos, Array::add);
42-
} finally {
43-
ts.getSubscribers().readUnlock(stamp);
44-
}
39+
if (ts == null) {
40+
return null;
41+
}
42+
var subscribers = ts.getSubscribers();
43+
long stamp = subscribers.readLock();
44+
try {
45+
subscribers.forEachFiltered(
46+
resultSubscribers,
47+
TopicSubscribers::removeDuplicateWithLowerQoS,
48+
Array::add
49+
);
50+
} finally {
51+
subscribers.readUnlock(stamp);
4552
}
4653
return ts;
4754
}
@@ -54,8 +61,8 @@ public void addSubscriber(@NotNull TopicFilter topicFilter, @NotNull Subscriber
5461
}
5562

5663
private void addSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
57-
if (level == topicFilter.size()) {
58-
getSubscribers().runInWriteLock(subscriber, ConcurrentArray::add);
64+
if (level == topicFilter.levelsCount()) {
65+
getSubscribers().runInWriteLock(subscriber, Array::add);
5966
} else {
6067
var topicSubscriber = getTopicSubscribers().getInWriteLock(
6168
topicFilter.getSegment(level),
@@ -72,8 +79,12 @@ public boolean removeSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttC
7279
}
7380

7481
private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
75-
if (level == topicFilter.size()) {
76-
return getSubscribers().removeConvertedIfInWriteLock(mqttClient, Subscriber::getMqttClient, Object::equals);
82+
if (level == topicFilter.levelsCount()) {
83+
return getSubscribers().removeConvertedIfInWriteLock(
84+
mqttClient,
85+
Subscriber::getMqttClient,
86+
Object::equals
87+
);
7788
} else {
7889
var topicSubscriber = getTopicSubscribers().getInReadLock(
7990
topicFilter.getSegment(level),
@@ -116,7 +127,7 @@ private void processSegment(
116127
resultSubscribers,
117128
TopicSubscribers::collectSubscribers
118129
);
119-
if (topicSubscriber != null && nextLevel < topicName.size()) {
130+
if (topicSubscriber != null && nextLevel < topicName.levelsCount()) {
120131
var nextSegment = topicName.getSegment(nextLevel);
121132
topicSubscriber.processLevel(nextLevel, nextSegment, topicName, resultSubscribers);
122133
}
@@ -126,7 +137,7 @@ private void processSegment(
126137
if (topicSubscribers == null) {
127138
synchronized (this) {
128139
if (topicSubscribers == null) {
129-
topicSubscribers = DictionaryFactory.newConcurrentStampedLockObjectDictionary();
140+
topicSubscribers = ConcurrentObjectDictionary.ofType(String.class, TopicSubscribers.class);
130141
}
131142
}
132143
}

src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public byte getPacketType() {
292292
@Override
293293
protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
294294
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039
295-
topicName = new TopicName(readString(buffer));
295+
topicName = TopicName.from(readString(buffer));
296296
packetId = qos != QoS.AT_MOST_ONCE_DELIVERY ? readUnsignedShort(buffer) : 0;
297297
}
298298

src/main/java/com/ss/mqtt/broker/network/packet/in/SubscribeInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
9090
var noLocal = !isMqtt5 || NumberUtils.isSetBit(options, 2);
9191
var rap = isMqtt5 && NumberUtils.isSetBit(options, 3);
9292

93-
topicFilters.add(new SubscribeTopicFilter(new TopicFilter(topicFilter), qos, retainHandling, noLocal, rap));
93+
topicFilters.add(new SubscribeTopicFilter(TopicFilter.from(topicFilter), qos, retainHandling, noLocal, rap));
9494
}
9595
}
9696

src/main/java/com/ss/mqtt/broker/network/packet/in/UnsubscribeInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
5555
}
5656

5757
while (buffer.hasRemaining()) {
58-
topicFilters.add(new TopicFilter(readString(buffer)));
58+
topicFilters.add(TopicFilter.from(readString(buffer)));
5959
}
6060
}
6161

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class SimpleSubscriptionService implements SubscriptionService {
4646
@NotNull Array<SubscribeTopicFilter> topicFilters
4747
) {
4848
return topicFilters.stream()
49-
.map(subscribeTopicFilter -> addSubscription(subscribeTopicFilter, mqttClient))
49+
.map(topicFilter -> addSubscription(topicFilter, mqttClient))
5050
.collect(ArrayCollectors.toArray(SubscribeAckReasonCode.class));
5151
}
5252

@@ -64,7 +64,7 @@ public class SimpleSubscriptionService implements SubscriptionService {
6464
@NotNull Array<TopicFilter> topicFilters
6565
) {
6666
return topicFilters.stream()
67-
.map(subscribeTopicFilter -> removeSubscription(subscribeTopicFilter, mqttClient))
67+
.map(topicFilter -> removeSubscription(topicFilter, mqttClient))
6868
.collect(ArrayCollectors.toArray(UnsubscribeAckReasonCode.class));
6969
}
7070

0 commit comments

Comments
 (0)