Skip to content

Commit 11d6efe

Browse files
committed
[broker-14] add TopicSubscriberTest
add lazy initialization of TopicSubscribers structure
1 parent 88ef60b commit 11d6efe

File tree

16 files changed

+339
-113
lines changed

16 files changed

+339
-113
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.5.0"
35+
rlibVersion = "9.6.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ss.mqtt.broker.model;
22

3+
import com.ss.mqtt.broker.model.topic.TopicFilter;
34
import lombok.Getter;
45
import lombok.RequiredArgsConstructor;
56

@@ -8,7 +9,7 @@
89
public class SubscribeTopicFilter {
910

1011
/**
11-
* The subscriber's topic name.
12+
* The subscriber's topic filter.
1213
*/
1314
private final TopicFilter topicFilter;
1415

src/main/java/com/ss/mqtt/broker/model/TopicFilter.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/model/AbstractTopic.java renamed to src/main/java/com/ss/mqtt/broker/model/topic/AbstractTopic.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ss.mqtt.broker.model;
1+
package com.ss.mqtt.broker.model.topic;
22

33
import com.ss.rlib.common.util.StringUtils;
44
import lombok.Getter;
@@ -11,8 +11,8 @@ abstract class AbstractTopic {
1111
static final String MULTI_LEVEL_WILDCARD = "#";
1212
static final String SINGLE_LEVEL_WILDCARD = "+";
1313

14-
protected final int length;
15-
protected final String[] levels;
14+
private final int length;
15+
private final String[] levels;
1616
private final String string;
1717

1818
AbstractTopic() {
@@ -40,4 +40,16 @@ public String toString() {
4040
return string;
4141
}
4242

43+
String getSegment(int level) {
44+
return levels[level];
45+
}
46+
47+
int size() {
48+
return levels.length;
49+
}
50+
51+
int lastLevel() {
52+
return levels.length - 1;
53+
}
54+
4355
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.ss.mqtt.broker.model.topic;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class TopicFilter extends AbstractTopic {
6+
7+
public TopicFilter(@NotNull String topicName) {
8+
super(topicName);
9+
10+
int multiPos = topicName.indexOf(MULTI_LEVEL_WILDCARD);
11+
if (multiPos != -1 && multiPos != topicName.length() - 1) {
12+
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicName);
13+
}
14+
if (topicName.contains("++")) {
15+
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicName);
16+
}
17+
}
18+
19+
public boolean matches(@NotNull TopicName topicName) {
20+
21+
if ((size() < topicName.size() && !getSegment(lastLevel()).equals(MULTI_LEVEL_WILDCARD)) ||
22+
(topicName.size() < size() && !topicName.getSegment(lastLevel()).equals(MULTI_LEVEL_WILDCARD))) {
23+
return false;
24+
}
25+
26+
int maxLength = Math.min(size(), topicName.size());
27+
for (int level = 0; level < maxLength; level++) {
28+
if (!getSegment(level).equals(topicName.getSegment(level)) &&
29+
!getSegment(level).equals(MULTI_LEVEL_WILDCARD) &&
30+
!topicName.getSegment(level).equals(MULTI_LEVEL_WILDCARD) &&
31+
!(getSegment(level).equals(SINGLE_LEVEL_WILDCARD) ||
32+
topicName.getSegment(level).equals(SINGLE_LEVEL_WILDCARD))) {
33+
return false;
34+
}
35+
}
36+
return true;
37+
}
38+
}
39+

src/main/java/com/ss/mqtt/broker/model/TopicName.java renamed to src/main/java/com/ss/mqtt/broker/model/topic/TopicName.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ss.mqtt.broker.model;
1+
package com.ss.mqtt.broker.model.topic;
22

33
import org.jetbrains.annotations.NotNull;
44

src/main/java/com/ss/mqtt/broker/model/TopicSubscriber.java renamed to src/main/java/com/ss/mqtt/broker/model/topic/TopicSubscribers.java

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1-
package com.ss.mqtt.broker.model;
1+
package com.ss.mqtt.broker.model.topic;
22

3-
import static com.ss.mqtt.broker.model.AbstractTopic.MULTI_LEVEL_WILDCARD;
4-
import static com.ss.mqtt.broker.model.AbstractTopic.SINGLE_LEVEL_WILDCARD;
3+
import static com.ss.mqtt.broker.model.topic.AbstractTopic.MULTI_LEVEL_WILDCARD;
4+
import static com.ss.mqtt.broker.model.topic.AbstractTopic.SINGLE_LEVEL_WILDCARD;
5+
import com.ss.mqtt.broker.model.Subscriber;
56
import com.ss.mqtt.broker.network.client.MqttClient;
7+
import com.ss.rlib.common.function.NotNullSupplier;
68
import com.ss.rlib.common.util.array.Array;
79
import com.ss.rlib.common.util.array.ConcurrentArray;
810
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
911
import com.ss.rlib.common.util.dictionary.DictionaryFactory;
1012
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
1113
import org.jetbrains.annotations.NotNull;
14+
import org.jetbrains.annotations.Nullable;
1215

13-
public class TopicSubscriber {
16+
public class TopicSubscribers {
17+
18+
private final static NotNullSupplier<TopicSubscribers> TOPIC_SUBSCRIBER_SUPPLIER = TopicSubscribers::new;
1419

1520
private static boolean filterByQos(@NotNull Array<Subscriber> subscribers, @NotNull Subscriber candidate) {
1621
var existed = subscribers.findAny(candidate, Subscriber::equals);
@@ -24,38 +29,38 @@ private static boolean filterByQos(@NotNull Array<Subscriber> subscribers, @NotN
2429
}
2530
}
2631

27-
private static TopicSubscriber collectSubscribers(
28-
@NotNull ConcurrentObjectDictionary<String, TopicSubscriber> topicSubscribers,
32+
private static TopicSubscribers collectSubscribers(
33+
@NotNull ConcurrentObjectDictionary<String, TopicSubscribers> topicSubscribers,
2934
@NotNull String topicName,
3035
@NotNull Array<Subscriber> resultSubscribers
3136
) {
3237
var ts = topicSubscribers.get(topicName);
3338
if (ts != null) {
34-
long stamp = ts.subscribers.readLock();
39+
long stamp = ts.getSubscribers().readLock();
3540
try {
36-
ts.subscribers.forEachFiltered(resultSubscribers, TopicSubscriber::filterByQos, Array::add);
41+
ts.getSubscribers().forEachFiltered(resultSubscribers, TopicSubscribers::filterByQos, Array::add);
3742
} finally {
38-
ts.subscribers.readUnlock(stamp);
43+
ts.getSubscribers().readUnlock(stamp);
3944
}
4045
}
4146
return ts;
4247
}
4348

44-
private final ConcurrentObjectDictionary<String, TopicSubscriber> topicSubscribers =
45-
DictionaryFactory.newConcurrentStampedLockObjectDictionary();
46-
private final ConcurrentArray<Subscriber> subscribers = ConcurrentArray.ofType(Subscriber.class);
49+
private @Nullable ConcurrentObjectDictionary<String, TopicSubscribers> topicSubscribers;
50+
private @Nullable ConcurrentArray<Subscriber> subscribers;
4751

4852
public void addSubscriber(@NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
4953
addSubscriber(0, topicFilter, subscriber);
5054
}
5155

5256
private void addSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
53-
if (level == topicFilter.levels.length) {
54-
subscribers.runInWriteLock(subscriber, ConcurrentArray::add);
57+
if (level == topicFilter.size()) {
58+
getSubscribers().runInWriteLock(subscriber, ConcurrentArray::add);
5559
} else {
56-
var topicSubscriber = topicSubscribers.getInWriteLock(
57-
topicFilter.levels[level],
58-
(ts, topic) -> ts.getOrCompute(topic, TopicSubscriber::new)
60+
var topicSubscriber = getTopicSubscribers().getInWriteLock(
61+
topicFilter.getSegment(level),
62+
TOPIC_SUBSCRIBER_SUPPLIER,
63+
ObjectDictionary::getOrCompute
5964
);
6065
//noinspection ConstantConditions
6166
topicSubscriber.addSubscriber(level + 1, topicFilter, subscriber);
@@ -67,10 +72,13 @@ public boolean removeSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttC
6772
}
6873

6974
private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
70-
if (level == topicFilter.levels.length) {
71-
return subscribers.removeIfInWriteLock(mqttClient, (client, subscriber) -> client.equals(subscriber.getMqttClient()));
75+
if (level == topicFilter.size()) {
76+
return getSubscribers().removeConvertedIfInWriteLock(mqttClient, Subscriber::getMqttClient, Object::equals);
7277
} else {
73-
var topicSubscriber = topicSubscribers.getInReadLock(topicFilter.levels[level], ObjectDictionary::get);
78+
var topicSubscriber = getTopicSubscribers().getInReadLock(
79+
topicFilter.getSegment(level),
80+
ObjectDictionary::get
81+
);
7482
if (topicSubscriber == null) {
7583
return false;
7684
} else {
@@ -81,7 +89,7 @@ private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @N
8189

8290
public @NotNull Array<Subscriber> matches(@NotNull TopicName topicName) {
8391
var resultArray = Array.ofType(Subscriber.class);
84-
processLevel(0, topicName.levels[0], topicName, resultArray);
92+
processLevel(0, topicName.getSegment(0), topicName, resultArray);
8593
return resultArray;
8694
}
8795

@@ -103,14 +111,36 @@ private void processSegment(
103111
@NotNull TopicName topicName,
104112
@NotNull Array<Subscriber> resultSubscribers
105113
) {
106-
var topicSubscriber = topicSubscribers.getInReadLock(
114+
var topicSubscriber = getTopicSubscribers().getInReadLock(
107115
segment,
108116
resultSubscribers,
109-
TopicSubscriber::collectSubscribers
117+
TopicSubscribers::collectSubscribers
110118
);
111-
if (topicSubscriber != null && nextLevel < topicName.levels.length) {
112-
var nextSegment = topicName.levels[nextLevel];
119+
if (topicSubscriber != null && nextLevel < topicName.size()) {
120+
var nextSegment = topicName.getSegment(nextLevel);
113121
topicSubscriber.processLevel(nextLevel, nextSegment, topicName, resultSubscribers);
114122
}
115123
}
124+
125+
private @NotNull ConcurrentObjectDictionary<String, TopicSubscribers> getTopicSubscribers() {
126+
if (topicSubscribers == null) {
127+
synchronized (this) {
128+
if (topicSubscribers == null) {
129+
topicSubscribers = DictionaryFactory.newConcurrentStampedLockObjectDictionary();
130+
}
131+
}
132+
}
133+
return topicSubscribers;
134+
}
135+
136+
private @NotNull ConcurrentArray<Subscriber> getSubscribers() {
137+
if (subscribers == null) {
138+
synchronized (this) {
139+
if (subscribers == null) {
140+
subscribers = ConcurrentArray.ofType(Subscriber.class);
141+
}
142+
}
143+
}
144+
return subscribers;
145+
}
116146
}

src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

3-
import static com.ss.mqtt.broker.model.TopicName.EMPTY_TOPIC_NAME;
3+
import static com.ss.mqtt.broker.model.topic.TopicName.EMPTY_TOPIC_NAME;
44
import com.ss.mqtt.broker.model.MqttPropertyConstants;
55
import com.ss.mqtt.broker.model.PacketProperty;
66
import com.ss.mqtt.broker.model.QoS;
7-
import com.ss.mqtt.broker.model.TopicName;
7+
import com.ss.mqtt.broker.model.topic.TopicName;
88
import com.ss.mqtt.broker.network.MqttConnection;
99
import com.ss.mqtt.broker.network.packet.PacketType;
1010
import com.ss.rlib.common.util.ArrayUtils;

src/main/java/com/ss/mqtt/broker/network/packet/in/SubscribeInPacket.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.model.*;
4+
import com.ss.mqtt.broker.model.topic.TopicFilter;
45
import com.ss.mqtt.broker.network.MqttConnection;
56
import com.ss.mqtt.broker.network.packet.PacketType;
67
import com.ss.rlib.common.util.NumberUtils;

src/main/java/com/ss/mqtt/broker/network/packet/in/UnsubscribeInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.model.PacketProperty;
4-
import com.ss.mqtt.broker.model.TopicFilter;
4+
import com.ss.mqtt.broker.model.topic.TopicFilter;
55
import com.ss.mqtt.broker.network.MqttConnection;
66
import com.ss.mqtt.broker.network.packet.PacketType;
77
import com.ss.rlib.common.util.array.Array;

0 commit comments

Comments
 (0)