Skip to content

Commit 606ba57

Browse files
committed
[broker-14] implement topic tree
1 parent a911c24 commit 606ba57

File tree

13 files changed

+331
-159
lines changed

13 files changed

+331
-159
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.ss.mqtt.broker.model;
2+
3+
import com.ss.rlib.common.util.StringUtils;
4+
import lombok.Getter;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
@Getter
8+
abstract class AbstractTopic {
9+
10+
static final String DELIMITER = "/";
11+
static final String MULTI_LEVEL_WILDCARD = "#";
12+
static final String SINGLE_LEVEL_WILDCARD = "+";
13+
14+
protected final int length;
15+
protected final String[] levels;
16+
private final String string;
17+
18+
AbstractTopic() {
19+
length = 0;
20+
levels = new String[0];
21+
string = StringUtils.EMPTY;
22+
}
23+
24+
AbstractTopic(@NotNull String topicName) {
25+
26+
length = topicName.length();
27+
if (length == 0) {
28+
throw new IllegalArgumentException("Topic name has zero length.");
29+
}
30+
if (topicName.contains("//") || topicName.startsWith("/") || topicName.endsWith("/")) {
31+
throw new IllegalArgumentException("Topic name has zero length level: " + topicName);
32+
}
33+
34+
levels = topicName.split(DELIMITER);
35+
string = topicName;
36+
}
37+
38+
@Override
39+
public String toString() {
40+
return string;
41+
}
42+
43+
}

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public class SubscribeTopicFilter {
1010
/**
1111
* The subscriber's topic name.
1212
*/
13-
private final String topicName;
13+
private final TopicFilter topicFilter;
1414

1515
/**
1616
* Maximum QoS field. This gives the maximum QoS level at which the Server
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.ss.mqtt.broker.model;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class TopicFilter extends AbstractTopic {
6+
7+
public TopicFilter(@NotNull String topicName) {
8+
super(topicName);
9+
10+
int multiPos = topicName.indexOf(MULTI_LEVEL_WILDCARD);
11+
if (multiPos != -1 && multiPos != length - 1) {
12+
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicName);
13+
}
14+
if (topicName.contains("++")) {
15+
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicName);
16+
}
17+
}
18+
19+
public boolean matches(@NotNull TopicName topicName) {
20+
21+
if ((this.levels.length < topicName.levels.length &&
22+
!this.levels[this.levels.length - 1].equals(MULTI_LEVEL_WILDCARD)) ||
23+
(topicName.levels.length < this.levels.length &&
24+
!topicName.levels[topicName.levels.length - 1].equals(MULTI_LEVEL_WILDCARD))) {
25+
return false;
26+
}
27+
28+
int maxLength = Math.min(this.levels.length, topicName.levels.length);
29+
for (int i = 0; i < maxLength; i++) {
30+
if (!this.levels[i].equals(topicName.levels[i]) && !this.levels[i].equals(MULTI_LEVEL_WILDCARD) &&
31+
!topicName.levels[i].equals(MULTI_LEVEL_WILDCARD) && !(this.levels[i].equals(SINGLE_LEVEL_WILDCARD) ||
32+
topicName.levels[i].equals(SINGLE_LEVEL_WILDCARD))) {
33+
return false;
34+
}
35+
}
36+
return true;
37+
}
38+
}
39+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.ss.mqtt.broker.model;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class TopicName extends AbstractTopic {
6+
7+
public static final TopicName EMPTY_TOPIC_NAME = new TopicName();
8+
9+
private TopicName() {
10+
}
11+
12+
public TopicName(@NotNull String topicName) {
13+
super(topicName);
14+
15+
if (topicName.contains(MULTI_LEVEL_WILDCARD)) {
16+
throw new IllegalArgumentException("Multi level wildcard is incorrectly used: " + topicName);
17+
}
18+
if (topicName.contains(SINGLE_LEVEL_WILDCARD)) {
19+
throw new IllegalArgumentException("Single level wildcard is incorrectly used: " + topicName);
20+
}
21+
}
22+
23+
}
24+
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package com.ss.mqtt.broker.model;
2+
3+
import static com.ss.mqtt.broker.model.AbstractTopic.MULTI_LEVEL_WILDCARD;
4+
import static com.ss.mqtt.broker.model.AbstractTopic.SINGLE_LEVEL_WILDCARD;
5+
import com.ss.mqtt.broker.network.client.MqttClient;
6+
import com.ss.rlib.common.util.array.Array;
7+
import com.ss.rlib.common.util.array.ConcurrentArray;
8+
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
9+
import com.ss.rlib.common.util.dictionary.DictionaryFactory;
10+
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
11+
import org.jetbrains.annotations.NotNull;
12+
13+
import java.util.Collection;
14+
15+
public class TopicSubscriber {
16+
17+
private static boolean filterSubscriber(@NotNull Array<Subscriber> resultArray, @NotNull Subscriber subscriber) {
18+
var found = resultArray.findAny(subscriber, Subscriber::equals);
19+
if (found != null && found.getQos().ordinal() < subscriber.getQos().ordinal()) {
20+
resultArray.remove(found);
21+
return true;
22+
} else {
23+
return false;
24+
}
25+
}
26+
27+
private static TopicSubscriber collectSubscribers(
28+
@NotNull ConcurrentObjectDictionary<String, TopicSubscriber> topicSubscribers,
29+
@NotNull String topicName,
30+
@NotNull Array<Subscriber> resultSubscribers
31+
) {
32+
var ts = topicSubscribers.get(topicName);
33+
if (ts != null) {
34+
long stamp = ts.subscribers.readLock();
35+
try {
36+
ts.subscribers.forEachFiltered(resultSubscribers, TopicSubscriber::filterSubscriber, Collection::add);
37+
} finally {
38+
ts.subscribers.readUnlock(stamp);
39+
}
40+
}
41+
return ts;
42+
}
43+
44+
private final ConcurrentObjectDictionary<String, TopicSubscriber> topicSubscribers =
45+
DictionaryFactory.newConcurrentStampedLockObjectDictionary();
46+
private final ConcurrentArray<Subscriber> subscribers = ConcurrentArray.ofType(Subscriber.class);
47+
48+
public void addSubscriber(@NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
49+
addSubscriber(0, topicFilter, subscriber);
50+
}
51+
52+
private void addSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
53+
if (level == topicFilter.levels.length - 1) {
54+
subscribers.runInWriteLock(subscriber, ConcurrentArray::add);
55+
} else {
56+
var topicSubscriber = topicSubscribers.getInWriteLock(
57+
topicFilter.levels[level],
58+
(ts, topic) -> ts.getOrCompute(topic, TopicSubscriber::new)
59+
);
60+
//noinspection ConstantConditions
61+
topicSubscriber.addSubscriber(level + 1, topicFilter, subscriber);
62+
}
63+
}
64+
65+
public boolean removeSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
66+
return removeSubscriber(0, topicFilter, mqttClient);
67+
}
68+
69+
private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
70+
if (level == topicFilter.levels.length - 1) {
71+
return subscribers.removeIfInWriteLock(mqttClient, (client, subscriber) -> client.equals(subscriber.getMqttClient()));
72+
} else {
73+
var topicSubscriber = topicSubscribers.getInReadLock(topicFilter.levels[level], ObjectDictionary::get);
74+
if (topicSubscriber == null) {
75+
return false;
76+
} else {
77+
return topicSubscriber.removeSubscriber(level + 1, topicFilter, mqttClient);
78+
}
79+
}
80+
}
81+
82+
public @NotNull Array<Subscriber> matches(@NotNull TopicName topicName) {
83+
var resultArray = Array.ofType(Subscriber.class);
84+
processLevel(0, topicName.levels[0], topicName, resultArray);
85+
return resultArray;
86+
}
87+
88+
private void processLevel(
89+
int level,
90+
@NotNull String segment,
91+
@NotNull TopicName topicName,
92+
@NotNull Array<Subscriber> resultSubscribers
93+
) {
94+
var nextLevel = level + 1;
95+
processSegment(nextLevel, segment, topicName, resultSubscribers);
96+
processSegment(nextLevel, SINGLE_LEVEL_WILDCARD, topicName, resultSubscribers);
97+
processSegment(nextLevel, MULTI_LEVEL_WILDCARD, topicName, resultSubscribers);
98+
}
99+
100+
private void processSegment(
101+
int nextLevel,
102+
@NotNull String segment,
103+
@NotNull TopicName topicName,
104+
@NotNull Array<Subscriber> resultSubscribers
105+
) {
106+
var topicSubscriber = topicSubscribers.getInReadLock(
107+
segment,
108+
resultSubscribers,
109+
TopicSubscriber::collectSubscribers
110+
);
111+
if (topicSubscriber != null && nextLevel < topicName.levels.length) {
112+
var nextSegment = topicName.levels[nextLevel];
113+
topicSubscriber.processLevel(nextLevel, nextSegment, topicName, resultSubscribers);
114+
}
115+
}
116+
}

src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

3+
import static com.ss.mqtt.broker.model.TopicName.EMPTY_TOPIC_NAME;
34
import com.ss.mqtt.broker.model.MqttPropertyConstants;
45
import com.ss.mqtt.broker.model.PacketProperty;
56
import com.ss.mqtt.broker.model.QoS;
7+
import com.ss.mqtt.broker.model.TopicName;
68
import com.ss.mqtt.broker.network.MqttConnection;
79
import com.ss.mqtt.broker.network.packet.PacketType;
810
import com.ss.rlib.common.util.ArrayUtils;
@@ -247,7 +249,7 @@ public class PublishInPacket extends MqttReadablePacket {
247249
* To reduce the size of the PUBLISH packet the sender can use a Topic Alias. The Topic Alias is described
248250
* in section 3.3.2.3.4. It is a Protocol Error if the Topic Name is zero length and there is no Topic Alias.
249251
*/
250-
private @NotNull String topicName;
252+
private @NotNull TopicName topicName;
251253

252254
/**
253255
* The Packet Identifier field is only present in PUBLISH packets where the QoS level is 1 or 2. Section
@@ -274,7 +276,7 @@ public PublishInPacket(byte info) {
274276
this.qos = QoS.of((info >> 1) & 0x03);
275277
this.retained = NumberUtils.isSetBit(info, 0);
276278
this.duplicate = NumberUtils.isSetBit(info, 3);
277-
this.topicName = StringUtils.EMPTY;
279+
this.topicName = EMPTY_TOPIC_NAME;
278280
this.responseTopic = StringUtils.EMPTY;
279281
this.contentType = StringUtils.EMPTY;
280282
this.correlationData = ArrayUtils.EMPTY_BYTE_ARRAY;
@@ -290,7 +292,7 @@ public byte getPacketType() {
290292
@Override
291293
protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
292294
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039
293-
topicName = readString(buffer);
295+
topicName = new TopicName(readString(buffer));
294296
packetId = qos != QoS.AT_MOST_ONCE_DELIVERY ? readUnsignedShort(buffer) : 0;
295297
}
296298

src/main/java/com/ss/mqtt/broker/network/packet/in/SubscribeInPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
8989
var noLocal = !isMqtt5 || NumberUtils.isSetBit(options, 2);
9090
var rap = isMqtt5 && NumberUtils.isSetBit(options, 3);
9191

92-
topicFilters.add(new SubscribeTopicFilter(topicFilter, qos, retainHandling, noLocal, rap));
92+
topicFilters.add(new SubscribeTopicFilter(new TopicFilter(topicFilter), qos, retainHandling, noLocal, rap));
9393
}
9494
}
9595

src/main/java/com/ss/mqtt/broker/network/packet/in/UnsubscribeInPacket.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.model.PacketProperty;
4+
import com.ss.mqtt.broker.model.TopicFilter;
45
import com.ss.mqtt.broker.network.MqttConnection;
56
import com.ss.mqtt.broker.network.packet.PacketType;
67
import com.ss.rlib.common.util.array.Array;
@@ -28,12 +29,12 @@ public class UnsubscribeInPacket extends MqttReadablePacket {
2829
PacketProperty.USER_PROPERTY
2930
);
3031

31-
private @NotNull Array<String> topicFilters;
32+
private @NotNull Array<TopicFilter> topicFilters;
3233
private int packetId;
3334

3435
public UnsubscribeInPacket(byte info) {
3536
super(info);
36-
this.topicFilters = ArrayFactory.newArray(String.class);
37+
this.topicFilters = ArrayFactory.newArray(TopicFilter.class);
3738
}
3839

3940
@Override
@@ -54,7 +55,7 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
5455
}
5556

5657
while (buffer.hasRemaining()) {
57-
topicFilters.add(readString(buffer));
58+
topicFilters.add(new TopicFilter(readString(buffer)));
5859
}
5960
}
6061

src/main/java/com/ss/mqtt/broker/service/SubscriptionService.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
package com.ss.mqtt.broker.service;
22

3-
import com.ss.mqtt.broker.model.ActionResult;
4-
import com.ss.mqtt.broker.model.SubscribeAckReasonCode;
5-
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
6-
import com.ss.mqtt.broker.model.UnsubscribeAckReasonCode;
3+
import com.ss.mqtt.broker.model.*;
74
import com.ss.mqtt.broker.network.client.MqttClient;
8-
import com.ss.rlib.common.function.NotNullNullableTripleFunction;
5+
import com.ss.rlib.common.function.NotNullNullableBiFunction;
96
import com.ss.rlib.common.util.array.Array;
10-
import com.ss.rlib.common.util.array.ConcurrentArray;
117
import org.jetbrains.annotations.NotNull;
128

139
/**
@@ -24,9 +20,9 @@ public interface SubscriptionService {
2420
* @return {@link ActionResult} of function
2521
*/
2622
@NotNull <A> ActionResult forEachTopicSubscriber(
27-
@NotNull String topicName,
23+
@NotNull TopicName topicName,
2824
@NotNull A argument,
29-
@NotNull NotNullNullableTripleFunction<ConcurrentArray<SubscribeTopicFilter>, MqttClient, A, Boolean> action
25+
@NotNull NotNullNullableBiFunction<Subscriber, A, Boolean> action
3026
);
3127

3228
/**
@@ -45,11 +41,11 @@ public interface SubscriptionService {
4541
* Removes MQTT client from subscribers by array of topic names
4642
*
4743
* @param mqttClient MQTT client to be removed
48-
* @param topicNames topic names
44+
* @param topicFilters topic names
4945
* @return array of unsubscribe ack reason codes
5046
*/
5147
@NotNull Array<UnsubscribeAckReasonCode> unsubscribe(
5248
@NotNull MqttClient mqttClient,
53-
@NotNull Array<String> topicNames
49+
@NotNull Array<TopicFilter> topicFilters
5450
);
5551
}

0 commit comments

Comments
 (0)