From a9a23a2c94835119152ec38c9903e31f4f60c9a8 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 00:32:56 +0100 Subject: [PATCH 1/7] Reduce interlocking during concurrent access --- buildSrc/build.gradle | 6 +- .../src/main/groovy/configure-java.gradle | 5 +- .../tree/SubscriberNodeBenchmark.java | 122 +++++++++++ .../tree/OptimizedSubscriberNode.java | 194 ++++++++++++++++++ .../topic/tree/SubscriberTreeTest.groovy | 2 +- 5 files changed, 326 insertions(+), 3 deletions(-) create mode 100644 model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java create mode 100644 model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 3a85dcfe..b29414ad 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -5,4 +5,8 @@ plugins { repositories { mavenCentral() gradlePluginPortal() -} \ No newline at end of file +} + +dependencies { + implementation 'me.champeau.jmh:jmh-gradle-plugin:0.7.3' +} diff --git a/buildSrc/src/main/groovy/configure-java.gradle b/buildSrc/src/main/groovy/configure-java.gradle index 78f290de..ceb86373 100644 --- a/buildSrc/src/main/groovy/configure-java.gradle +++ b/buildSrc/src/main/groovy/configure-java.gradle @@ -4,6 +4,7 @@ plugins { id("java") id("java-test-fixtures") id("configure-jacoco") + id("me.champeau.jmh") } java { @@ -16,6 +17,8 @@ dependencies { compileOnly libs.jspecify compileOnly libs.lombok annotationProcessor libs.lombok + jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37' + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.37' } test { @@ -36,4 +39,4 @@ tasks.withType(JavaCompile).configureEach { processResources { filter(ReplaceTokens, tokens: []) -} \ No newline at end of file +} diff --git a/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java new file mode 100644 index 00000000..8612c13b --- /dev/null +++ b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java @@ -0,0 +1,122 @@ +package javasabr.mqtt.model.subscriber.tree; + +import java.util.concurrent.TimeUnit; +import javasabr.mqtt.model.subscriber.SingleSubscriber; +import javasabr.mqtt.model.subscription.Subscription; +import javasabr.mqtt.model.subscription.TestMqttUser; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; +import javasabr.mqtt.model.topic.tree.SubscriberTreeTest; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.array.MutableArray; +import org.jspecify.annotations.NonNull; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Threads(10) +public class SubscriberNodeBenchmark { + + private SubscriberNode originalImplementation; + private OptimizedSubscriberNode optimizedImplementation; + + private TopicName[] topicNames; + private int[] lastLevels; + + private static final String[] TEST_TOPIC_NAMES = { + "home/kitchen/sensor/temp", + "home/livingroom/light/state", + "system/events/alert/priority/low", + "command/reboot/device/101", + "$SYS/broker/clients/total", + "data", + "metrics/cpu/usage", + "user/id_12345/settings", + "long/path/with/many/segments/is/unlikely/but/possible", + "a/b/c/d/e/f/g", + "short" + }; + + private static final String[] TEST_FILTERS = { + "home/#", + "home/kitchen/sensor/temp", + "home/+/sensor/temp", + "home/+/+/temp", + "system/events/+/priority/#", + "$SYS/#", + "$SYS/broker/clients/+", + "#", + "+", + "+/+/+/+", + "a/b/c/d/e/f/g/#", + "+/kitchen/sensor/temp", + "data", + "data/#", + "command/+", + "system/events/#", + "user/+/settings" + }; + + @Setup + public void setup() { + topicNames = new TopicName[TEST_TOPIC_NAMES.length]; + lastLevels = new int[TEST_TOPIC_NAMES.length]; + for (int i = 0; i < TEST_TOPIC_NAMES.length; i++) { + topicNames[i] = new TopicName(TEST_TOPIC_NAMES[i]); + lastLevels[i] = topicNames[i].levelsCount() - 1; + } + + originalImplementation = new SubscriberNode(); + optimizedImplementation = new OptimizedSubscriberNode(); + for (String rawTopic : TEST_FILTERS) { + TopicFilter topicFilter = new TopicFilter(rawTopic); + Subscription subscription = SubscriberTreeTest.makeSubscription(rawTopic); + TestMqttUser owner = new TestMqttUser("id"); + originalImplementation.subscribe(0, owner, subscription, topicFilter); + optimizedImplementation.subscribe(0, owner, subscription, topicFilter); + } + } + + @State(Scope.Thread) + public static class ThreadState { + int index = 0; + public MutableArray<@NonNull SingleSubscriber> container; + + @Setup(Level.Trial) + public void setupThread() { + container = ArrayFactory.mutableArray(SingleSubscriber.class); + } + } + + @Benchmark + public void originalImplementation(ThreadState state, Blackhole bh) { + int i = state.index % topicNames.length; + originalImplementation.matchesTo(0, topicNames[i], lastLevels[i], state.container); + bh.consume(state.container); + state.index++; + } + + @Benchmark + public void optimizedImplementation(ThreadState state, Blackhole bh) { + int i = state.index % topicNames.length; + optimizedImplementation.matchesTo(0, topicNames[i], lastLevels[i], state.container); + bh.consume(state.container); + state.index++; + } +} diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java new file mode 100644 index 00000000..f59ebec5 --- /dev/null +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -0,0 +1,194 @@ +package javasabr.mqtt.model.subscriber.tree; + +import java.util.function.Supplier; +import javasabr.mqtt.base.util.DebugUtils; +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.model.QoS; +import javasabr.mqtt.model.subscriber.SingleSubscriber; +import javasabr.mqtt.model.subscriber.Subscriber; +import javasabr.mqtt.model.subscription.Subscription; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.array.LockableArray; +import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; + +@Getter(AccessLevel.PACKAGE) +@Accessors(fluent = true, chain = false) +@FieldDefaults(level = AccessLevel.PRIVATE) +class OptimizedSubscriberNode extends SubscriberTreeBase { + + private final static Supplier SUBSCRIBER_NODE_FACTORY = OptimizedSubscriberNode::new; + + static { + DebugUtils.registerIncludedFields("childNodes", "subscribers"); + } + + protected static void appendSubscribersTo(MutableArray result, OptimizedSubscriberNode subscriberNode) { + LockableArray subscribers = subscriberNode.subscribers(); + if (subscribers == null) { + return; + } + long stamp = subscribers.readLock(); + try { + for (Subscriber subscriber : subscribers) { + SingleSubscriber singleSubscriber = subscriber.resolveSingle(); + if (removeDuplicateWithLowerQoS(result, singleSubscriber)) { + result.add(singleSubscriber); + } + } + } finally { + subscribers.readUnlock(stamp); + } + } + + private static boolean removeDuplicateWithLowerQoS( + MutableArray result, SingleSubscriber candidate) { + int found = result.indexOf(SingleSubscriber::user, candidate.user()); + if (found == -1) { + return true; + } + QoS candidateQos = candidate.qos(); + SingleSubscriber exist = result.get(found); + QoS existeQos = exist.qos(); + if (existeQos.ordinal() < candidateQos.ordinal()) { + result.remove(found); + return true; + } + return false; + } + + @Nullable + volatile LockableRefToRefDictionary childNodes; + @Nullable + volatile LockableArray subscribers; + + /** + * @return the previous subscription from the same owner + */ + @Nullable + public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { + if (level == topicFilter.levelsCount()) { + return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); + } + OptimizedSubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level)); + return childNode.subscribe(level + 1, owner, subscription, topicFilter); + } + + public boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { + if (level == topicFilter.levelsCount()) { + return removeSubscriber(subscribers(), owner, topicFilter); + } + OptimizedSubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level)); + return childNode.unsubscribe(level + 1, owner, topicFilter); + } + + protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableArray container) { + LockableRefToRefDictionary nodes = childNodes(); + if (nodes == null) { + return; + } + long stamp = nodes.readLock(); + try { + collectSegmentMatches(nodes, topicName.segment(level), level, topicName, lastLevel, container); + collectSegmentMatches(nodes, TopicFilter.SINGLE_LEVEL_WILDCARD, level, topicName, lastLevel, container); + collectWildcardMatches(nodes, container); + } finally { + nodes.readUnlock(stamp); + } + } + + private void collectSegmentMatches( + @Nullable LockableRefToRefDictionary childNodes, + String segment, + int level, + TopicName topicName, + int lastLevel, + MutableArray result) { + if (childNodes == null) { + return; + } + OptimizedSubscriberNode subscriberNode = childNodes.get(segment); + if (subscriberNode == null) { + return; + } + if (level == lastLevel) { + appendSubscribersTo(result, subscriberNode); + } else if (level < lastLevel) { + subscriberNode.matchesTo(level + 1, topicName, lastLevel, result); + } + } + + private void collectWildcardMatches( + @Nullable LockableRefToRefDictionary childNodes, + MutableArray result) { + if (childNodes == null) { + return; + } + OptimizedSubscriberNode subscriberNode = childNodes.get(TopicFilter.MULTI_LEVEL_WILDCARD); + if (subscriberNode != null) { + appendSubscribersTo(result, subscriberNode); + } + } + + private OptimizedSubscriberNode getOrCreateChildNode(String segment) { + LockableRefToRefDictionary childNodes = getOrCreateChildNodes(); + long stamp = childNodes.readLock(); + try { + OptimizedSubscriberNode subscriberNode = childNodes.get(segment); + if (subscriberNode != null) { + return subscriberNode; + } + } finally { + childNodes.readUnlock(stamp); + } + stamp = childNodes.writeLock(); + try { + return childNodes.getOrCompute(segment, SUBSCRIBER_NODE_FACTORY); + } finally { + childNodes.writeUnlock(stamp); + } + } + + private LockableRefToRefDictionary getOrCreateChildNodes() { + LockableRefToRefDictionary current = childNodes; + if (current != null) { + return current; + } + synchronized (this) { + current = childNodes; + if (current == null) { + current = DictionaryFactory.stampedLockBasedRefToRefDictionary(); + childNodes = current; + } + return current; + } + } + + private LockableArray getOrCreateSubscribers() { + LockableArray current = subscribers; + if (current != null) { + return current; + } + synchronized (this) { + current = subscribers; + if (current == null) { + current = ArrayFactory.stampedLockBasedArray(Subscriber.class); + subscribers = current; + } + return current; + } + } + + @Override + public String toString() { + return DebugUtils.toJsonString(this); + } +} diff --git a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy index cfb17623..a9b0c225 100644 --- a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy +++ b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy @@ -624,7 +624,7 @@ class SubscriberTreeTest extends UnitSpecification { return new TestMqttUser(id) } - static def makeSubscription(String topicFilter) { + static Subscription makeSubscription(String topicFilter) { return new Subscription( TopicFilter.valueOf(topicFilter), MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET, From 5642c40cc2bba16f71128dfca8de445d3a51335e Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 01:17:04 +0100 Subject: [PATCH 2/7] Add tests for optimized implementation --- .../impl/InMemorySubscriptionService.java | 3 +- .../tree/ConcurrentSubscriberTree.java | 8 +- .../tree/OptimizedSubscriberNode.java | 9 +- .../model/subscriber/tree/SubscriberNode.java | 8 +- .../subscriber/tree/SubscriberTreeBase.java | 24 +- .../topic/tree/SubscriberTreeTest.groovy | 413 +++++++++++++++++- 6 files changed, 427 insertions(+), 38 deletions(-) diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java index fcc635ba..3bae73f7 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java @@ -11,6 +11,7 @@ import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscriber.Subscriber; import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; +import javasabr.mqtt.model.subscriber.tree.SubscriberNode; import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; @@ -34,7 +35,7 @@ public class InMemorySubscriptionService implements SubscriptionService { ConcurrentSubscriberTree subscriberTree; public InMemorySubscriptionService() { - this.subscriberTree = new ConcurrentSubscriberTree(); + this.subscriberTree = new ConcurrentSubscriberTree(new SubscriberNode()); } @Override diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index 307db58c..c296fe5f 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -9,17 +9,15 @@ import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.ThreadSafe; import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; import org.jspecify.annotations.Nullable; +@RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class ConcurrentSubscriberTree implements ThreadSafe { - SubscriberNode rootNode; - - public ConcurrentSubscriberTree() { - this.rootNode = new SubscriberNode(); - } + SubscriberTreeBase rootNode; @Nullable public SingleSubscriber subscribe(MqttUser user, Subscription subscription) { diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java index f59ebec5..8ce82f12 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -23,7 +23,7 @@ @Getter(AccessLevel.PACKAGE) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE) -class OptimizedSubscriberNode extends SubscriberTreeBase { +public class OptimizedSubscriberNode extends SubscriberTreeBase { private final static Supplier SUBSCRIBER_NODE_FACTORY = OptimizedSubscriberNode::new; @@ -31,7 +31,9 @@ class OptimizedSubscriberNode extends SubscriberTreeBase { DebugUtils.registerIncludedFields("childNodes", "subscribers"); } - protected static void appendSubscribersTo(MutableArray result, OptimizedSubscriberNode subscriberNode) { + protected static void appendSubscribersTo( + MutableArray result, + OptimizedSubscriberNode subscriberNode) { LockableArray subscribers = subscriberNode.subscribers(); if (subscribers == null) { return; @@ -50,7 +52,8 @@ protected static void appendSubscribersTo(MutableArray result, } private static boolean removeDuplicateWithLowerQoS( - MutableArray result, SingleSubscriber candidate) { + MutableArray result, + SingleSubscriber candidate) { int found = result.indexOf(SingleSubscriber::user, candidate.user()); if (found == -1) { return true; diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java index 4a6579c2..248acbcb 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java @@ -22,7 +22,7 @@ @Getter(AccessLevel.PACKAGE) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE) -class SubscriberNode extends SubscriberTreeBase { +public class SubscriberNode extends SubscriberTreeBase { private final static Supplier SUBSCRIBER_NODE_FACTORY = SubscriberNode::new; @@ -61,11 +61,7 @@ protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableA multiWildcardTopicMatch(container); } - private void exactlyTopicMatch( - int level, - TopicName topicName, - int lastLevel, - MutableArray result) { + private void exactlyTopicMatch(int level, TopicName topicName, int lastLevel, MutableArray result) { String segment = topicName.segment(level); SubscriberNode subscriberNode = childNode(segment); if (subscriberNode == null) { diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java index 972b696b..01f9f083 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java @@ -9,6 +9,7 @@ import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.LockableArray; import javasabr.rlib.collections.array.MutableArray; import lombok.AccessLevel; @@ -18,7 +19,21 @@ @RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) -abstract class SubscriberTreeBase { +public abstract class SubscriberTreeBase { + + protected abstract void matchesTo( + int level, + TopicName topicName, + int lastLevel, + MutableArray container); + + public abstract boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter); + + public abstract SingleSubscriber subscribe( + int level, + MqttUser owner, + Subscription subscription, + TopicFilter topicFilter); /** * @return previous subscriber with the same user @@ -45,9 +60,7 @@ protected static SingleSubscriber addSubscriber( } @Nullable - private static SingleSubscriber removePreviousIfExist( - LockableArray subscribers, - MqttUser user) { + private static SingleSubscriber removePreviousIfExist(LockableArray subscribers, MqttUser user) { int index = subscribers.indexOf(Subscriber::resolveUser, user); if (index < 0) { return null; @@ -142,7 +155,8 @@ private static boolean isSharedSubscriberWithGroup(Subscriber subscriber, String } private static boolean removeDuplicateWithLowerQoS( - MutableArray result, SingleSubscriber candidate) { + MutableArray result, + SingleSubscriber candidate) { int found = result.indexOf(SingleSubscriber::user, candidate.user()); if (found == -1) { diff --git a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy index a9b0c225..82f74c0d 100644 --- a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy +++ b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy @@ -6,6 +6,9 @@ import javasabr.mqtt.model.QoS import javasabr.mqtt.model.SubscribeRetainHandling import javasabr.mqtt.model.subscriber.SingleSubscriber import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree +import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode +import javasabr.mqtt.model.subscriber.tree.SubscriberNode +import javasabr.mqtt.model.subscriber.tree.SubscriberTreeBase import javasabr.mqtt.model.subscription.Subscription import javasabr.mqtt.model.subscription.TestMqttUser import javasabr.mqtt.model.topic.SharedTopicFilter @@ -19,9 +22,10 @@ class SubscriberTreeTest extends UnitSpecification { List subscriptions, List users, String topicName, - List expectedUsers) { + List expectedUsers, + SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -32,11 +36,38 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ + "/topic/segment1", + "/topic/segment2", + "/topic/segment3", "/topic/segment1", "/topic/segment2", "/topic/segment3" ] subscriptions << [ + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic/segment1/segment2"), + makeSubscription("/topic/"), + makeSubscription("/topic") + ], + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic/segment1/segment2"), + makeSubscription("/topic/"), + makeSubscription("/topic/segment2"), + makeSubscription("/"), + makeSubscription("/topic/segment2/segment1") + ], + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic/segment3"), + makeSubscription("/topic/segment3"), + makeSubscription("/topic/segment3"), + makeSubscription("/topic/segment3") + ], [ makeSubscription("/topic/segment1"), makeSubscription("/topic/segment2"), @@ -63,6 +94,30 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3"), + makeUser("id4") + ], [ makeUser("id1"), makeUser("id2"), @@ -89,6 +144,17 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ + [ + makeUser("id1") + ], + [ + makeUser("id2"), + makeUser("id5") + ], + [ + makeUser("id3"), + makeUser("id4") + ], [ makeUser("id1") ], @@ -101,15 +167,24 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id4") ] ] + subscriberNode << [ + new SubscriberNode(), + new SubscriberNode(), + new SubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode() + ] } def "should match single wildcard topic correctly"( List subscriptions, List users, String topicName, - List expectedUsers) { + List expectedUsers, + SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -120,11 +195,44 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ + "/topic/segment1", + "/topic/segment2", + "/topic/segment3", "/topic/segment1", "/topic/segment2", "/topic/segment3" ] subscriptions << [ + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/+"), + makeSubscription("/+/segment1"), + makeSubscription("/+/+"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic2/segment1"), + makeSubscription("/+/segment2"), + makeSubscription("/topic2/+") + ], + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/+"), + makeSubscription("/+/segment1"), + makeSubscription("/+/+"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic2/segment1"), + makeSubscription("/+/segment2"), + makeSubscription("/topic2/+") + ], + [ + makeSubscription("/topic/segment1"), + makeSubscription("/topic/+"), + makeSubscription("/+/segment1"), + makeSubscription("/+/+"), + makeSubscription("/topic/segment2"), + makeSubscription("/topic2/segment1"), + makeSubscription("/+/segment2"), + makeSubscription("/topic2/+") + ], [ makeSubscription("/topic/segment1"), makeSubscription("/topic/+"), @@ -157,6 +265,36 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") + ], [ makeUser("id1"), makeUser("id2"), @@ -189,6 +327,22 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4") + ], + [ + makeUser("id2"), + makeUser("id4"), + makeUser("id5"), + makeUser("id7") + ], + [ + makeUser("id2"), + makeUser("id4") + ], [ makeUser("id1"), makeUser("id2"), @@ -206,15 +360,24 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id4") ] ] + subscriberNode << [ + new SubscriberNode(), + new SubscriberNode(), + new SubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode() + ] } def "should match multi wildcard topic correctly"( List subscriptions, List users, String topicName, - List expectedUsers) { + List expectedUsers, + SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -225,11 +388,47 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ + "/topic/segment1/segment2", + "/topic/segment3/segment4", + "/topic/segment2", "/topic/segment1/segment2", "/topic/segment3/segment4", "/topic/segment2" ] subscriptions << [ + [ + makeSubscription("/topic/segment1/segment2"), + makeSubscription("/topic/segment1/#"), + makeSubscription("/topic/#"), + makeSubscription("/#"), + makeSubscription("#"), + makeSubscription("/topic/segment2/segment3"), + makeSubscription("/topic/segment2/#"), + makeSubscription("/topic/segment3/segment4"), + makeSubscription("/topic/segment3/#") + ], + [ + makeSubscription("/topic/segment1/segment2"), + makeSubscription("/topic/segment1/#"), + makeSubscription("/topic/#"), + makeSubscription("/#"), + makeSubscription("#"), + makeSubscription("/topic/segment2/segment3"), + makeSubscription("/topic/segment2/#"), + makeSubscription("/topic/segment3/segment4"), + makeSubscription("/topic/segment3/#") + ], + [ + makeSubscription("/topic/segment1/segment2"), + makeSubscription("/topic/segment1/#"), + makeSubscription("/topic/#"), + makeSubscription("/#"), + makeSubscription("#"), + makeSubscription("/topic/segment2/segment3"), + makeSubscription("/topic/segment2/#"), + makeSubscription("/topic/segment3/segment4"), + makeSubscription("/topic/segment3/#") + ], [ makeSubscription("/topic/segment1/segment2"), makeSubscription("/topic/segment1/#"), @@ -265,6 +464,39 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") + ], + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") + ], [ makeUser("id1"), makeUser("id2"), @@ -300,6 +532,25 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") + ], + [ + makeUser("id8"), + makeUser("id9"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") + ], + [ + makeUser("id3"), + makeUser("id4"), + makeUser("id5") + ], [ makeUser("id1"), makeUser("id2"), @@ -320,15 +571,24 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id5") ] ] + subscriberNode << [ + new SubscriberNode(), + new SubscriberNode(), + new SubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode() + ] } def "should choose strongest QoS when the same subscriber has several matches"( List subscriptions, List users, String topicName, - List expectedSubscribers) { + List expectedSubscribers, + SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -338,11 +598,47 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedSubscribers where: topicName << [ + "/topic/segment1/segment2", + "/topic/segment3", + "/topic/segment2/", "/topic/segment1/segment2", "/topic/segment3", "/topic/segment2/" ] subscriptions << [ + [ + makeSubscription("/topic/segment1/segment2", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment1/segment3", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment2/segment3", 2), + makeSubscription("/topic/segment2/#", 1), + makeSubscription("/topic/#", 0) + ], + [ + makeSubscription("/topic/segment1/segment2", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment1/segment3", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment2/segment3", 2), + makeSubscription("/topic/segment2/#", 1), + makeSubscription("/topic/#", 0) + ], + [ + makeSubscription("/topic/segment1/segment2", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment1/segment3", 2), + makeSubscription("/topic/segment1/#", 1), + makeSubscription("/topic/#", 0), + makeSubscription("/topic/segment2/segment3", 2), + makeSubscription("/topic/segment2/#", 1), + makeSubscription("/topic/#", 0) + ], [ makeSubscription("/topic/segment1/segment2", 2), makeSubscription("/topic/segment1/#", 1), @@ -378,6 +674,39 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ + [ + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") + ], + [ + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") + ], + [ + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") + ], [ makeUser("id1"), makeUser("id1"), @@ -413,6 +742,21 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedSubscribers << [ + [ + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/segment1/segment2", 2)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/segment1/#", 1)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), + ], + [ + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), + ], + [ + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/segment2/#", 1)), + ], [ new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/segment1/segment2", 2)), new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/segment1/#", 1)), @@ -429,13 +773,21 @@ class SubscriberTreeTest extends UnitSpecification { new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/segment2/#", 1)), ] ] + subscriberNode << [ + new SubscriberNode(), + new SubscriberNode(), + new SubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode(), + new OptimizedSubscriberNode() + ] } - def "should provide different owners when math shared topic"() { + def "should provide different owners when math shared topic"(SubscriberTreeBase subscriberNode) { given: def group1 = ["id1", "id2", "id3", "id4", "id5"] def group2 = ["id6", "id7", "id8", "id9", "id10"] - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) @@ -464,11 +816,16 @@ class SubscriberTreeTest extends UnitSpecification { (group1.contains(matched[1]) && group2.contains(matched[0])) (group1.contains(matched2[0]) && group2.contains(matched2[1])) || (group1.contains(matched2[1]) && group2.contains(matched2[0])) + where: + subscriberNode << [ + new SubscriberNode(), + new OptimizedSubscriberNode() + ] } - def "should subscribe and unsubscribe simple topic correctly correctly"() { + def "should subscribe and unsubscribe simple topic correctly correctly"(SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriberTree.subscribe(makeUser("id1"), makeSubscription('topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSubscription('topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSubscription('topic/name1')) @@ -501,11 +858,16 @@ class SubscriberTreeTest extends UnitSpecification { matched.size() == 0 id1WasUnsubscribed !id3WasUnsubscribed + where: + subscriberNode << [ + new SubscriberNode(), + new OptimizedSubscriberNode() + ] } - def "should subscribe and unsubscribe shared topic correctly correctly"() { + def "should subscribe and unsubscribe shared topic correctly correctly"(SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) @@ -538,11 +900,16 @@ class SubscriberTreeTest extends UnitSpecification { matched.size() == 0 id1WasUnsubscribed !id3WasUnsubscribed + where: + subscriberNode << [ + new SubscriberNode(), + new OptimizedSubscriberNode() + ] } - def "should replace the same subscriptions"() { + def "should replace the same subscriptions"(SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) def owner1 = makeUser("id1") def originalSub = makeSubscription('topic/name1') def replacementSub = makeSubscription('topic/name1') @@ -566,11 +933,16 @@ class SubscriberTreeTest extends UnitSpecification { matched.first().subscription() == replacementSub previous != null previous.subscription() == originalSub + where: + subscriberNode << [ + new SubscriberNode(), + new OptimizedSubscriberNode() + ] } - def "should extend shared subscription group on multiply subscribing by the same topic"() { + def "should extend shared subscription group on multiply subscribing by the same topic"(SubscriberTreeBase subscriberNode) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) def owner1 = makeUser("id1") def owner2 = makeUser("id2") subscriberTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1')) @@ -618,6 +990,11 @@ class SubscriberTreeTest extends UnitSpecification { then: matched.size() == 1 matched.first().user() == owner1 + where: + subscriberNode << [ + new SubscriberNode(), + new OptimizedSubscriberNode() + ] } static def makeUser(String id) { From 880b3147cba7d9482969cf07afddc7418833d1f1 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 01:42:13 +0100 Subject: [PATCH 3/7] Revert unnecessary changes --- .../config/MqttBrokerSpringConfig.java | 14 +- .../impl/InMemorySubscriptionService.java | 6 +- .../tree/ConcurrentSubscriberTree.java | 4 + .../topic/tree/SubscriberTreeTest.groovy | 413 +----------------- 4 files changed, 38 insertions(+), 399 deletions(-) diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java index eb4c4470..1ec60b01 100644 --- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java +++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java @@ -5,6 +5,10 @@ import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttServerConnectionConfig; import javasabr.mqtt.model.QoS; +import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; +import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode; +import javasabr.mqtt.model.subscriber.tree.SubscriberNode; +import javasabr.mqtt.model.subscriber.tree.SubscriberTreeBase; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.MqttConnectionFactory; import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler; @@ -65,6 +69,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; +@SuppressWarnings("unused") @CustomLog @Configuration(proxyBeanMethods = false) public class MqttBrokerSpringConfig { @@ -98,8 +103,13 @@ AuthenticationService authenticationService( } @Bean - SubscriptionService subscriptionService() { - return new InMemorySubscriptionService(); + ConcurrentSubscriberTree subscriberTree() { + return new ConcurrentSubscriberTree(new SubscriberNode()); + } + + @Bean + SubscriptionService subscriptionService(ConcurrentSubscriberTree subscriberTree) { + return new InMemorySubscriptionService(subscriberTree); } @Bean diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java index 3bae73f7..92241322 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java @@ -11,7 +11,7 @@ import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscriber.Subscriber; import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; -import javasabr.mqtt.model.subscriber.tree.SubscriberNode; +import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode; import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; @@ -23,19 +23,21 @@ import javasabr.rlib.collections.array.MutableArray; import lombok.AccessLevel; import lombok.CustomLog; +import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; /** * In memory subscription service based on {@link ConcurrentSubscriberTree} */ @CustomLog +@RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class InMemorySubscriptionService implements SubscriptionService { ConcurrentSubscriberTree subscriberTree; public InMemorySubscriptionService() { - this.subscriberTree = new ConcurrentSubscriberTree(new SubscriberNode()); + this.subscriberTree = new ConcurrentSubscriberTree(new OptimizedSubscriberNode()); } @Override diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index c296fe5f..2c925ec4 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -19,6 +19,10 @@ public class ConcurrentSubscriberTree implements ThreadSafe { SubscriberTreeBase rootNode; + public ConcurrentSubscriberTree() { + rootNode = new OptimizedSubscriberNode(); + } + @Nullable public SingleSubscriber subscribe(MqttUser user, Subscription subscription) { return rootNode.subscribe(0, user, subscription, subscription.topicFilter()); diff --git a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy index 82f74c0d..a9b0c225 100644 --- a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy +++ b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy @@ -6,9 +6,6 @@ import javasabr.mqtt.model.QoS import javasabr.mqtt.model.SubscribeRetainHandling import javasabr.mqtt.model.subscriber.SingleSubscriber import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree -import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode -import javasabr.mqtt.model.subscriber.tree.SubscriberNode -import javasabr.mqtt.model.subscriber.tree.SubscriberTreeBase import javasabr.mqtt.model.subscription.Subscription import javasabr.mqtt.model.subscription.TestMqttUser import javasabr.mqtt.model.topic.SharedTopicFilter @@ -22,10 +19,9 @@ class SubscriberTreeTest extends UnitSpecification { List subscriptions, List users, String topicName, - List expectedUsers, - SubscriberTreeBase subscriberNode) { + List expectedUsers) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -36,38 +32,11 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ - "/topic/segment1", - "/topic/segment2", - "/topic/segment3", "/topic/segment1", "/topic/segment2", "/topic/segment3" ] subscriptions << [ - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic/segment1/segment2"), - makeSubscription("/topic/"), - makeSubscription("/topic") - ], - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic/segment1/segment2"), - makeSubscription("/topic/"), - makeSubscription("/topic/segment2"), - makeSubscription("/"), - makeSubscription("/topic/segment2/segment1") - ], - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic/segment3"), - makeSubscription("/topic/segment3"), - makeSubscription("/topic/segment3"), - makeSubscription("/topic/segment3") - ], [ makeSubscription("/topic/segment1"), makeSubscription("/topic/segment2"), @@ -94,30 +63,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id3"), - makeUser("id3"), - makeUser("id4") - ], [ makeUser("id1"), makeUser("id2"), @@ -144,17 +89,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ - [ - makeUser("id1") - ], - [ - makeUser("id2"), - makeUser("id5") - ], - [ - makeUser("id3"), - makeUser("id4") - ], [ makeUser("id1") ], @@ -167,24 +101,15 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id4") ] ] - subscriberNode << [ - new SubscriberNode(), - new SubscriberNode(), - new SubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode() - ] } def "should match single wildcard topic correctly"( List subscriptions, List users, String topicName, - List expectedUsers, - SubscriberTreeBase subscriberNode) { + List expectedUsers) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -195,44 +120,11 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ - "/topic/segment1", - "/topic/segment2", - "/topic/segment3", "/topic/segment1", "/topic/segment2", "/topic/segment3" ] subscriptions << [ - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/+"), - makeSubscription("/+/segment1"), - makeSubscription("/+/+"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic2/segment1"), - makeSubscription("/+/segment2"), - makeSubscription("/topic2/+") - ], - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/+"), - makeSubscription("/+/segment1"), - makeSubscription("/+/+"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic2/segment1"), - makeSubscription("/+/segment2"), - makeSubscription("/topic2/+") - ], - [ - makeSubscription("/topic/segment1"), - makeSubscription("/topic/+"), - makeSubscription("/+/segment1"), - makeSubscription("/+/+"), - makeSubscription("/topic/segment2"), - makeSubscription("/topic2/segment1"), - makeSubscription("/+/segment2"), - makeSubscription("/topic2/+") - ], [ makeSubscription("/topic/segment1"), makeSubscription("/topic/+"), @@ -265,36 +157,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8") - ], [ makeUser("id1"), makeUser("id2"), @@ -327,22 +189,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4") - ], - [ - makeUser("id2"), - makeUser("id4"), - makeUser("id5"), - makeUser("id7") - ], - [ - makeUser("id2"), - makeUser("id4") - ], [ makeUser("id1"), makeUser("id2"), @@ -360,24 +206,15 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id4") ] ] - subscriberNode << [ - new SubscriberNode(), - new SubscriberNode(), - new SubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode() - ] } def "should match multi wildcard topic correctly"( List subscriptions, List users, String topicName, - List expectedUsers, - SubscriberTreeBase subscriberNode) { + List expectedUsers) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -388,47 +225,11 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedUsers where: topicName << [ - "/topic/segment1/segment2", - "/topic/segment3/segment4", - "/topic/segment2", "/topic/segment1/segment2", "/topic/segment3/segment4", "/topic/segment2" ] subscriptions << [ - [ - makeSubscription("/topic/segment1/segment2"), - makeSubscription("/topic/segment1/#"), - makeSubscription("/topic/#"), - makeSubscription("/#"), - makeSubscription("#"), - makeSubscription("/topic/segment2/segment3"), - makeSubscription("/topic/segment2/#"), - makeSubscription("/topic/segment3/segment4"), - makeSubscription("/topic/segment3/#") - ], - [ - makeSubscription("/topic/segment1/segment2"), - makeSubscription("/topic/segment1/#"), - makeSubscription("/topic/#"), - makeSubscription("/#"), - makeSubscription("#"), - makeSubscription("/topic/segment2/segment3"), - makeSubscription("/topic/segment2/#"), - makeSubscription("/topic/segment3/segment4"), - makeSubscription("/topic/segment3/#") - ], - [ - makeSubscription("/topic/segment1/segment2"), - makeSubscription("/topic/segment1/#"), - makeSubscription("/topic/#"), - makeSubscription("/#"), - makeSubscription("#"), - makeSubscription("/topic/segment2/segment3"), - makeSubscription("/topic/segment2/#"), - makeSubscription("/topic/segment3/segment4"), - makeSubscription("/topic/segment3/#") - ], [ makeSubscription("/topic/segment1/segment2"), makeSubscription("/topic/segment1/#"), @@ -464,39 +265,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8"), - makeUser("id9") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8"), - makeUser("id9") - ], - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5"), - makeUser("id6"), - makeUser("id7"), - makeUser("id8"), - makeUser("id9") - ], [ makeUser("id1"), makeUser("id2"), @@ -532,25 +300,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedUsers << [ - [ - makeUser("id1"), - makeUser("id2"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5") - ], - [ - makeUser("id8"), - makeUser("id9"), - makeUser("id3"), - makeUser("id4"), - makeUser("id5") - ], - [ - makeUser("id3"), - makeUser("id4"), - makeUser("id5") - ], [ makeUser("id1"), makeUser("id2"), @@ -571,24 +320,15 @@ class SubscriberTreeTest extends UnitSpecification { makeUser("id5") ] ] - subscriberNode << [ - new SubscriberNode(), - new SubscriberNode(), - new SubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode() - ] } def "should choose strongest QoS when the same subscriber has several matches"( List subscriptions, List users, String topicName, - List expectedSubscribers, - SubscriberTreeBase subscriberNode) { + List expectedSubscribers) { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> subscriberTree.subscribe(users.get(i), subscription) } @@ -598,47 +338,11 @@ class SubscriberTreeTest extends UnitSpecification { found ==~ expectedSubscribers where: topicName << [ - "/topic/segment1/segment2", - "/topic/segment3", - "/topic/segment2/", "/topic/segment1/segment2", "/topic/segment3", "/topic/segment2/" ] subscriptions << [ - [ - makeSubscription("/topic/segment1/segment2", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment1/segment3", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment2/segment3", 2), - makeSubscription("/topic/segment2/#", 1), - makeSubscription("/topic/#", 0) - ], - [ - makeSubscription("/topic/segment1/segment2", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment1/segment3", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment2/segment3", 2), - makeSubscription("/topic/segment2/#", 1), - makeSubscription("/topic/#", 0) - ], - [ - makeSubscription("/topic/segment1/segment2", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment1/segment3", 2), - makeSubscription("/topic/segment1/#", 1), - makeSubscription("/topic/#", 0), - makeSubscription("/topic/segment2/segment3", 2), - makeSubscription("/topic/segment2/#", 1), - makeSubscription("/topic/#", 0) - ], [ makeSubscription("/topic/segment1/segment2", 2), makeSubscription("/topic/segment1/#", 1), @@ -674,39 +378,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] users << [ - [ - makeUser("id1"), - makeUser("id1"), - makeUser("id1"), - makeUser("id2"), - makeUser("id2"), - makeUser("id2"), - makeUser("id3"), - makeUser("id3"), - makeUser("id3") - ], - [ - makeUser("id1"), - makeUser("id1"), - makeUser("id1"), - makeUser("id2"), - makeUser("id2"), - makeUser("id2"), - makeUser("id3"), - makeUser("id3"), - makeUser("id3") - ], - [ - makeUser("id1"), - makeUser("id1"), - makeUser("id1"), - makeUser("id2"), - makeUser("id2"), - makeUser("id2"), - makeUser("id3"), - makeUser("id3"), - makeUser("id3") - ], [ makeUser("id1"), makeUser("id1"), @@ -742,21 +413,6 @@ class SubscriberTreeTest extends UnitSpecification { ] ] expectedSubscribers << [ - [ - new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/segment1/segment2", 2)), - new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/segment1/#", 1)), - new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), - ], - [ - new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), - ], - [ - new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/segment2/#", 1)), - ], [ new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/segment1/segment2", 2)), new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/segment1/#", 1)), @@ -773,21 +429,13 @@ class SubscriberTreeTest extends UnitSpecification { new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/segment2/#", 1)), ] ] - subscriberNode << [ - new SubscriberNode(), - new SubscriberNode(), - new SubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode(), - new OptimizedSubscriberNode() - ] } - def "should provide different owners when math shared topic"(SubscriberTreeBase subscriberNode) { + def "should provide different owners when math shared topic"() { given: def group1 = ["id1", "id2", "id3", "id4", "id5"] def group2 = ["id6", "id7", "id8", "id9", "id10"] - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) @@ -816,16 +464,11 @@ class SubscriberTreeTest extends UnitSpecification { (group1.contains(matched[1]) && group2.contains(matched[0])) (group1.contains(matched2[0]) && group2.contains(matched2[1])) || (group1.contains(matched2[1]) && group2.contains(matched2[0])) - where: - subscriberNode << [ - new SubscriberNode(), - new OptimizedSubscriberNode() - ] } - def "should subscribe and unsubscribe simple topic correctly correctly"(SubscriberTreeBase subscriberNode) { + def "should subscribe and unsubscribe simple topic correctly correctly"() { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriberTree.subscribe(makeUser("id1"), makeSubscription('topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSubscription('topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSubscription('topic/name1')) @@ -858,16 +501,11 @@ class SubscriberTreeTest extends UnitSpecification { matched.size() == 0 id1WasUnsubscribed !id3WasUnsubscribed - where: - subscriberNode << [ - new SubscriberNode(), - new OptimizedSubscriberNode() - ] } - def "should subscribe and unsubscribe shared topic correctly correctly"(SubscriberTreeBase subscriberNode) { + def "should subscribe and unsubscribe shared topic correctly correctly"() { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) @@ -900,16 +538,11 @@ class SubscriberTreeTest extends UnitSpecification { matched.size() == 0 id1WasUnsubscribed !id3WasUnsubscribed - where: - subscriberNode << [ - new SubscriberNode(), - new OptimizedSubscriberNode() - ] } - def "should replace the same subscriptions"(SubscriberTreeBase subscriberNode) { + def "should replace the same subscriptions"() { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() def owner1 = makeUser("id1") def originalSub = makeSubscription('topic/name1') def replacementSub = makeSubscription('topic/name1') @@ -933,16 +566,11 @@ class SubscriberTreeTest extends UnitSpecification { matched.first().subscription() == replacementSub previous != null previous.subscription() == originalSub - where: - subscriberNode << [ - new SubscriberNode(), - new OptimizedSubscriberNode() - ] } - def "should extend shared subscription group on multiply subscribing by the same topic"(SubscriberTreeBase subscriberNode) { + def "should extend shared subscription group on multiply subscribing by the same topic"() { given: - ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree(subscriberNode) + ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() def owner1 = makeUser("id1") def owner2 = makeUser("id2") subscriberTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1')) @@ -990,11 +618,6 @@ class SubscriberTreeTest extends UnitSpecification { then: matched.size() == 1 matched.first().user() == owner1 - where: - subscriberNode << [ - new SubscriberNode(), - new OptimizedSubscriberNode() - ] } static def makeUser(String id) { From 365173f02091081684b334054fd9704efb20f7db Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:52:45 +0100 Subject: [PATCH 4/7] Revert unnecessary changes --- .../application/config/MqttBrokerSpringConfig.java | 5 ++--- .../mqtt/service/impl/InMemorySubscriptionService.java | 3 +-- .../subscriber/tree/ConcurrentSubscriberTree.java | 10 ++-------- .../model/subscriber/tree/OptimizedSubscriberNode.java | 4 ++-- .../mqtt/model/subscriber/tree/SubscriberNode.java | 4 ++-- .../mqtt/model/subscriber/tree/SubscriberTreeBase.java | 5 +++-- 6 files changed, 12 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java index 1ec60b01..0382c8dd 100644 --- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java +++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java @@ -8,7 +8,6 @@ import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode; import javasabr.mqtt.model.subscriber.tree.SubscriberNode; -import javasabr.mqtt.model.subscriber.tree.SubscriberTreeBase; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.MqttConnectionFactory; import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler; @@ -69,8 +68,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; -@SuppressWarnings("unused") @CustomLog +@SuppressWarnings("unused") @Configuration(proxyBeanMethods = false) public class MqttBrokerSpringConfig { @@ -104,7 +103,7 @@ AuthenticationService authenticationService( @Bean ConcurrentSubscriberTree subscriberTree() { - return new ConcurrentSubscriberTree(new SubscriberNode()); + return new ConcurrentSubscriberTree(new OptimizedSubscriberNode()); } @Bean diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java index 92241322..7e194e9f 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java @@ -11,7 +11,6 @@ import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscriber.Subscriber; import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; -import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode; import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; @@ -37,7 +36,7 @@ public class InMemorySubscriptionService implements SubscriptionService { ConcurrentSubscriberTree subscriberTree; public InMemorySubscriptionService() { - this.subscriberTree = new ConcurrentSubscriberTree(new OptimizedSubscriberNode()); + this.subscriberTree = new ConcurrentSubscriberTree(); } @Override diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index 2c925ec4..8078eb6e 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -20,7 +20,7 @@ public class ConcurrentSubscriberTree implements ThreadSafe { SubscriberTreeBase rootNode; public ConcurrentSubscriberTree() { - rootNode = new OptimizedSubscriberNode(); + this.rootNode = new OptimizedSubscriberNode(); } @Nullable @@ -34,13 +34,7 @@ public boolean unsubscribe(MqttUser user, TopicFilter topicFilter) { public Array matches(TopicName topicName) { var resultArray = MutableArray.ofType(SingleSubscriber.class); - matchesTo(resultArray, topicName); - return resultArray; - } - - public MutableArray matchesTo(MutableArray container, TopicName topicName) { - var resultArray = MutableArray.ofType(SingleSubscriber.class); - rootNode.matchesTo(0, topicName, topicName.levelsCount() - 1, container); + rootNode.matchesTo(0, topicName, topicName.levelsCount() - 1, resultArray); return resultArray; } } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java index 8ce82f12..8a2fe558 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -77,7 +77,7 @@ private static boolean removeDuplicateWithLowerQoS( * @return the previous subscription from the same owner */ @Nullable - public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { + protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); } @@ -85,7 +85,7 @@ public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscr return childNode.subscribe(level + 1, owner, subscription, topicFilter); } - public boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { + protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return removeSubscriber(subscribers(), owner, topicFilter); } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java index 248acbcb..1ad3b56b 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java @@ -39,7 +39,7 @@ public class SubscriberNode extends SubscriberTreeBase { * @return the previous subscription from the same owner */ @Nullable - public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { + protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); } @@ -47,7 +47,7 @@ public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscr return childNode.subscribe(level + 1, owner, subscription, topicFilter); } - public boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { + protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return removeSubscriber(subscribers(), owner, topicFilter); } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java index 01f9f083..d5438ec1 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java @@ -27,9 +27,10 @@ protected abstract void matchesTo( int lastLevel, MutableArray container); - public abstract boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter); + protected abstract boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter); - public abstract SingleSubscriber subscribe( + @Nullable + protected abstract SingleSubscriber subscribe( int level, MqttUser owner, Subscription subscription, From 42b892d029ca213fbc1b5d3878745515777ca741 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 13:20:17 +0100 Subject: [PATCH 5/7] Refactoring of removeDuplicateWithLowerQoS --- .../tree/OptimizedSubscriberNode.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java index 8a2fe558..d3508bb0 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -31,7 +31,7 @@ public class OptimizedSubscriberNode extends SubscriberTreeBase { DebugUtils.registerIncludedFields("childNodes", "subscribers"); } - protected static void appendSubscribersTo( + private void appendSubscribersTo( MutableArray result, OptimizedSubscriberNode subscriberNode) { LockableArray subscribers = subscriberNode.subscribers(); @@ -41,31 +41,28 @@ protected static void appendSubscribersTo( long stamp = subscribers.readLock(); try { for (Subscriber subscriber : subscribers) { - SingleSubscriber singleSubscriber = subscriber.resolveSingle(); - if (removeDuplicateWithLowerQoS(result, singleSubscriber)) { - result.add(singleSubscriber); - } + addOrReplaceIfLowerQos(result, subscriber); } } finally { subscribers.readUnlock(stamp); } } - private static boolean removeDuplicateWithLowerQoS( + private static void addOrReplaceIfLowerQos( MutableArray result, - SingleSubscriber candidate) { - int found = result.indexOf(SingleSubscriber::user, candidate.user()); + Subscriber subscriber) { + SingleSubscriber subscriberFromNode = subscriber.resolveSingle(); + int found = result.indexOf(SingleSubscriber::user, subscriberFromNode.user()); if (found == -1) { - return true; + result.add(subscriberFromNode); + return; } - QoS candidateQos = candidate.qos(); - SingleSubscriber exist = result.get(found); - QoS existeQos = exist.qos(); - if (existeQos.ordinal() < candidateQos.ordinal()) { + QoS existedQos = result.get(found).qos(); + QoS candidateQos = subscriberFromNode.qos(); + if (existedQos.ordinal() < candidateQos.ordinal()) { result.remove(found); - return true; + result.add(subscriberFromNode); } - return false; } @Nullable From e07ce615ed26771f132a6104aee9278809bd4988 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 13:33:10 +0100 Subject: [PATCH 6/7] Merge collectWildcardMatches with collectSegmentMatches --- .../tree/OptimizedSubscriberNode.java | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java index d3508bb0..695bf236 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -31,9 +31,7 @@ public class OptimizedSubscriberNode extends SubscriberTreeBase { DebugUtils.registerIncludedFields("childNodes", "subscribers"); } - private void appendSubscribersTo( - MutableArray result, - OptimizedSubscriberNode subscriberNode) { + private void appendSubscribersTo(MutableArray result, OptimizedSubscriberNode subscriberNode) { LockableArray subscribers = subscriberNode.subscribers(); if (subscribers == null) { return; @@ -48,9 +46,7 @@ private void appendSubscribersTo( } } - private static void addOrReplaceIfLowerQos( - MutableArray result, - Subscriber subscriber) { + private static void addOrReplaceIfLowerQos(MutableArray result, Subscriber subscriber) { SingleSubscriber subscriberFromNode = subscriber.resolveSingle(); int found = result.indexOf(SingleSubscriber::user, subscriberFromNode.user()); if (found == -1) { @@ -99,45 +95,30 @@ protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableA try { collectSegmentMatches(nodes, topicName.segment(level), level, topicName, lastLevel, container); collectSegmentMatches(nodes, TopicFilter.SINGLE_LEVEL_WILDCARD, level, topicName, lastLevel, container); - collectWildcardMatches(nodes, container); + collectSegmentMatches(nodes, TopicFilter.MULTI_LEVEL_WILDCARD, level, topicName, lastLevel, container); } finally { nodes.readUnlock(stamp); } } private void collectSegmentMatches( - @Nullable LockableRefToRefDictionary childNodes, + LockableRefToRefDictionary childNodes, String segment, int level, TopicName topicName, int lastLevel, MutableArray result) { - if (childNodes == null) { - return; - } OptimizedSubscriberNode subscriberNode = childNodes.get(segment); if (subscriberNode == null) { return; } - if (level == lastLevel) { + if (level == lastLevel || TopicFilter.MULTI_LEVEL_WILDCARD.equals(segment)) { appendSubscribersTo(result, subscriberNode); } else if (level < lastLevel) { subscriberNode.matchesTo(level + 1, topicName, lastLevel, result); } } - private void collectWildcardMatches( - @Nullable LockableRefToRefDictionary childNodes, - MutableArray result) { - if (childNodes == null) { - return; - } - OptimizedSubscriberNode subscriberNode = childNodes.get(TopicFilter.MULTI_LEVEL_WILDCARD); - if (subscriberNode != null) { - appendSubscribersTo(result, subscriberNode); - } - } - private OptimizedSubscriberNode getOrCreateChildNode(String segment) { LockableRefToRefDictionary childNodes = getOrCreateChildNodes(); long stamp = childNodes.readLock(); From be25c8c8a18ffc97c7409de7b50bce8d75e8071a Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Tue, 2 Dec 2025 18:48:20 +0100 Subject: [PATCH 7/7] Switch to HashBasedDictionary in order to reduce algorithm complexity --- .../tree/SubscriberNodeBenchmark.java | 87 +++++++++++++++---- .../tree/ConcurrentSubscriberTree.java | 8 +- .../tree/OptimizedSubscriberNode.java | 51 ++++++----- .../model/subscriber/tree/SubscriberNode.java | 10 +++ .../subscriber/tree/SubscriberTreeBase.java | 3 +- 5 files changed, 118 insertions(+), 41 deletions(-) diff --git a/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java index 8612c13b..86f18890 100644 --- a/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java +++ b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java @@ -1,6 +1,7 @@ package javasabr.mqtt.model.subscriber.tree; import java.util.concurrent.TimeUnit; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.subscription.TestMqttUser; @@ -9,14 +10,19 @@ import javasabr.mqtt.model.topic.tree.SubscriberTreeTest; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import org.jspecify.annotations.NonNull; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; @@ -25,20 +31,26 @@ import org.openjdk.jmh.infra.Blackhole; @Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 20, time = 2, timeUnit = TimeUnit.SECONDS) @Fork(value = 1) @BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Benchmark) -@Threads(10) +@Threads(100) public class SubscriberNodeBenchmark { + @Param({"100"}) + public int subscribeFrequency; + private SubscriberNode originalImplementation; private OptimizedSubscriberNode optimizedImplementation; private TopicName[] topicNames; private int[] lastLevels; + private TopicFilter[] topicFilters; + private Subscription[] subscriptions; + private static final String[] TEST_TOPIC_NAMES = { "home/kitchen/sensor/temp", "home/livingroom/light/state", @@ -82,14 +94,22 @@ public void setup() { lastLevels[i] = topicNames[i].levelsCount() - 1; } + topicFilters = new TopicFilter[TEST_FILTERS.length]; + subscriptions = new Subscription[TEST_FILTERS.length]; + originalImplementation = new SubscriberNode(); optimizedImplementation = new OptimizedSubscriberNode(); - for (String rawTopic : TEST_FILTERS) { + + for (int i = 0; i < TEST_FILTERS.length; i++) { + String rawTopic = TEST_FILTERS[i]; TopicFilter topicFilter = new TopicFilter(rawTopic); - Subscription subscription = SubscriberTreeTest.makeSubscription(rawTopic); - TestMqttUser owner = new TestMqttUser("id"); - originalImplementation.subscribe(0, owner, subscription, topicFilter); - optimizedImplementation.subscribe(0, owner, subscription, topicFilter); + TestMqttUser owner = new TestMqttUser("init_user_" + i); + + topicFilters[i] = topicFilter; + subscriptions[i] = SubscriberTreeTest.makeSubscription(rawTopic); + + originalImplementation.subscribe(0, owner, subscriptions[i], topicFilter); + optimizedImplementation.subscribe(0, owner, subscriptions[i], topicFilter); } } @@ -97,25 +117,62 @@ public void setup() { public static class ThreadState { int index = 0; public MutableArray<@NonNull SingleSubscriber> container; - + public MutableRefToRefDictionary mapContainer; + private TestMqttUser userForSubscription; @Setup(Level.Trial) public void setupThread() { container = ArrayFactory.mutableArray(SingleSubscriber.class); + mapContainer = DictionaryFactory.mutableRefToRefDictionary(); + userForSubscription = new TestMqttUser("thread_user_" + Thread.currentThread().getId()); } } + @Group("original") @Benchmark - public void originalImplementation(ThreadState state, Blackhole bh) { - int i = state.index % topicNames.length; - originalImplementation.matchesTo(0, topicNames[i], lastLevels[i], state.container); + public void originalImplementationSubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + originalImplementation.subscribe(0, state.userForSubscription, subscriptions[topicFilterIndex], topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + state.index++; + } + @Group("original") + @Benchmark + public void originalImplementationUnsubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + originalImplementation.unsubscribe(0, state.userForSubscription, topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + } + @Group("original") + @GroupThreads(5) + @Benchmark + public void originalImplementationMatchTo(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicNameIndex = state.index % topicNames.length; + originalImplementation.matchesTo(0, topicNames[topicNameIndex], lastLevels[topicNameIndex], state.container); bh.consume(state.container); state.index++; } + @Group("optimized") + @Benchmark + public void optimizedImplementationSubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + optimizedImplementation.subscribe(0, state.userForSubscription, subscriptions[topicFilterIndex], topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + state.index++; + } + @Group("optimized") + @Benchmark + public void optimizedImplementationUnsubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + optimizedImplementation.unsubscribe(0, state.userForSubscription, topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + } + @Group("optimized") + @GroupThreads(5) @Benchmark - public void optimizedImplementation(ThreadState state, Blackhole bh) { - int i = state.index % topicNames.length; - optimizedImplementation.matchesTo(0, topicNames[i], lastLevels[i], state.container); + public void optimizedImplementationMatchTo(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicNameIndex = state.index % topicNames.length; + optimizedImplementation.matchesTo(0, topicNames[topicNameIndex], lastLevels[topicNameIndex], state.mapContainer); bh.consume(state.container); state.index++; } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index 8078eb6e..92c2fdba 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -6,7 +6,9 @@ import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.Array; -import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import javasabr.rlib.common.ThreadSafe; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; @@ -33,8 +35,8 @@ public boolean unsubscribe(MqttUser user, TopicFilter topicFilter) { } public Array matches(TopicName topicName) { - var resultArray = MutableArray.ofType(SingleSubscriber.class); + MutableRefToRefDictionary resultArray = DictionaryFactory.mutableRefToRefDictionary(); rootNode.matchesTo(0, topicName, topicName.levelsCount() - 1, resultArray); - return resultArray; + return resultArray.values(ArrayFactory.mutableArray(SingleSubscriber.class)); } } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java index 695bf236..0b59f544 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -11,9 +11,9 @@ import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.LockableArray; -import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.collections.dictionary.DictionaryFactory; import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; @@ -31,7 +31,7 @@ public class OptimizedSubscriberNode extends SubscriberTreeBase { DebugUtils.registerIncludedFields("childNodes", "subscribers"); } - private void appendSubscribersTo(MutableArray result, OptimizedSubscriberNode subscriberNode) { + private void appendSubscribersTo(MutableRefToRefDictionary result, OptimizedSubscriberNode subscriberNode) { LockableArray subscribers = subscriberNode.subscribers(); if (subscribers == null) { return; @@ -46,18 +46,17 @@ private void appendSubscribersTo(MutableArray result, Optimize } } - private static void addOrReplaceIfLowerQos(MutableArray result, Subscriber subscriber) { + private static void addOrReplaceIfLowerQos(MutableRefToRefDictionary result, Subscriber subscriber) { SingleSubscriber subscriberFromNode = subscriber.resolveSingle(); - int found = result.indexOf(SingleSubscriber::user, subscriberFromNode.user()); - if (found == -1) { - result.add(subscriberFromNode); + SingleSubscriber singleSubscriber = result.get(subscriberFromNode.user()); + if (singleSubscriber == null) { + result.put(subscriberFromNode.user(), subscriberFromNode); return; } - QoS existedQos = result.get(found).qos(); + QoS existedQos = singleSubscriber.qos(); QoS candidateQos = subscriberFromNode.qos(); if (existedQos.ordinal() < candidateQos.ordinal()) { - result.remove(found); - result.add(subscriberFromNode); + result.put(subscriberFromNode.user(), subscriberFromNode); } } @@ -86,29 +85,23 @@ protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter return childNode.unsubscribe(level + 1, owner, topicFilter); } - protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableArray container) { + protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableRefToRefDictionary container) { LockableRefToRefDictionary nodes = childNodes(); if (nodes == null) { return; } - long stamp = nodes.readLock(); - try { - collectSegmentMatches(nodes, topicName.segment(level), level, topicName, lastLevel, container); - collectSegmentMatches(nodes, TopicFilter.SINGLE_LEVEL_WILDCARD, level, topicName, lastLevel, container); - collectSegmentMatches(nodes, TopicFilter.MULTI_LEVEL_WILDCARD, level, topicName, lastLevel, container); - } finally { - nodes.readUnlock(stamp); - } + collectMatchingSubscribers(topicName.segment(level), level, topicName, lastLevel, container); + collectMatchingSubscribers(TopicFilter.SINGLE_LEVEL_WILDCARD, level, topicName, lastLevel, container); + collectMatchingSubscribers(TopicFilter.MULTI_LEVEL_WILDCARD, level, topicName, lastLevel, container); } - private void collectSegmentMatches( - LockableRefToRefDictionary childNodes, + private void collectMatchingSubscribers( String segment, int level, TopicName topicName, int lastLevel, - MutableArray result) { - OptimizedSubscriberNode subscriberNode = childNodes.get(segment); + MutableRefToRefDictionary result) { + OptimizedSubscriberNode subscriberNode = childNode(segment); if (subscriberNode == null) { return; } @@ -168,6 +161,20 @@ private LockableArray getOrCreateSubscribers() { } } + @Nullable + private OptimizedSubscriberNode childNode(String segment) { + LockableRefToRefDictionary childNodes = childNodes(); + if (childNodes == null) { + return null; + } + long stamp = childNodes.readLock(); + try { + return childNodes.get(segment); + } finally { + childNodes.readUnlock(stamp); + } + } + @Override public String toString() { return DebugUtils.toJsonString(this); diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java index 1ad3b56b..09abcf1c 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java @@ -13,6 +13,7 @@ import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.collections.dictionary.DictionaryFactory; import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; @@ -47,6 +48,15 @@ protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription sub return childNode.subscribe(level + 1, owner, subscription, topicFilter); } + @Override + protected void matchesTo( + int level, + TopicName topicName, + int lastLevel, + MutableRefToRefDictionary container) { + + } + protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return removeSubscriber(subscribers(), owner, topicFilter); diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java index d5438ec1..2923ff59 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java @@ -12,6 +12,7 @@ import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.LockableArray; import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; @@ -25,7 +26,7 @@ protected abstract void matchesTo( int level, TopicName topicName, int lastLevel, - MutableArray container); + MutableRefToRefDictionary container); protected abstract boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter);