diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java index eb4c4470..0382c8dd 100644 --- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java +++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java @@ -5,6 +5,9 @@ import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttServerConnectionConfig; import javasabr.mqtt.model.QoS; +import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree; +import javasabr.mqtt.model.subscriber.tree.OptimizedSubscriberNode; +import javasabr.mqtt.model.subscriber.tree.SubscriberNode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.MqttConnectionFactory; import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler; @@ -66,6 +69,7 @@ import org.springframework.core.env.Environment; @CustomLog +@SuppressWarnings("unused") @Configuration(proxyBeanMethods = false) public class MqttBrokerSpringConfig { @@ -98,8 +102,13 @@ AuthenticationService authenticationService( } @Bean - SubscriptionService subscriptionService() { - return new InMemorySubscriptionService(); + ConcurrentSubscriberTree subscriberTree() { + return new ConcurrentSubscriberTree(new OptimizedSubscriberNode()); + } + + @Bean + SubscriptionService subscriptionService(ConcurrentSubscriberTree subscriberTree) { + return new InMemorySubscriptionService(subscriberTree); } @Bean diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 3a85dcfe..b29414ad 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -5,4 +5,8 @@ plugins { repositories { mavenCentral() gradlePluginPortal() -} \ No newline at end of file +} + +dependencies { + implementation 'me.champeau.jmh:jmh-gradle-plugin:0.7.3' +} diff --git a/buildSrc/src/main/groovy/configure-java.gradle b/buildSrc/src/main/groovy/configure-java.gradle index 78f290de..ceb86373 100644 --- a/buildSrc/src/main/groovy/configure-java.gradle +++ b/buildSrc/src/main/groovy/configure-java.gradle @@ -4,6 +4,7 @@ plugins { id("java") id("java-test-fixtures") id("configure-jacoco") + id("me.champeau.jmh") } java { @@ -16,6 +17,8 @@ dependencies { compileOnly libs.jspecify compileOnly libs.lombok annotationProcessor libs.lombok + jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37' + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.37' } test { @@ -36,4 +39,4 @@ tasks.withType(JavaCompile).configureEach { processResources { filter(ReplaceTokens, tokens: []) -} \ No newline at end of file +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java index fcc635ba..7e194e9f 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java @@ -22,12 +22,14 @@ import javasabr.rlib.collections.array.MutableArray; import lombok.AccessLevel; import lombok.CustomLog; +import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; /** * In memory subscription service based on {@link ConcurrentSubscriberTree} */ @CustomLog +@RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class InMemorySubscriptionService implements SubscriptionService { diff --git a/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java new file mode 100644 index 00000000..86f18890 --- /dev/null +++ b/model/src/jmh/java/javasabr/mqtt/model/subscriber/tree/SubscriberNodeBenchmark.java @@ -0,0 +1,179 @@ +package javasabr.mqtt.model.subscriber.tree; + +import java.util.concurrent.TimeUnit; +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.model.subscriber.SingleSubscriber; +import javasabr.mqtt.model.subscription.Subscription; +import javasabr.mqtt.model.subscription.TestMqttUser; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; +import javasabr.mqtt.model.topic.tree.SubscriberTreeTest; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; +import org.jspecify.annotations.NonNull; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 20, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Benchmark) +@Threads(100) +public class SubscriberNodeBenchmark { + + @Param({"100"}) + public int subscribeFrequency; + + private SubscriberNode originalImplementation; + private OptimizedSubscriberNode optimizedImplementation; + + private TopicName[] topicNames; + private int[] lastLevels; + + private TopicFilter[] topicFilters; + private Subscription[] subscriptions; + + private static final String[] TEST_TOPIC_NAMES = { + "home/kitchen/sensor/temp", + "home/livingroom/light/state", + "system/events/alert/priority/low", + "command/reboot/device/101", + "$SYS/broker/clients/total", + "data", + "metrics/cpu/usage", + "user/id_12345/settings", + "long/path/with/many/segments/is/unlikely/but/possible", + "a/b/c/d/e/f/g", + "short" + }; + + private static final String[] TEST_FILTERS = { + "home/#", + "home/kitchen/sensor/temp", + "home/+/sensor/temp", + "home/+/+/temp", + "system/events/+/priority/#", + "$SYS/#", + "$SYS/broker/clients/+", + "#", + "+", + "+/+/+/+", + "a/b/c/d/e/f/g/#", + "+/kitchen/sensor/temp", + "data", + "data/#", + "command/+", + "system/events/#", + "user/+/settings" + }; + + @Setup + public void setup() { + topicNames = new TopicName[TEST_TOPIC_NAMES.length]; + lastLevels = new int[TEST_TOPIC_NAMES.length]; + for (int i = 0; i < TEST_TOPIC_NAMES.length; i++) { + topicNames[i] = new TopicName(TEST_TOPIC_NAMES[i]); + lastLevels[i] = topicNames[i].levelsCount() - 1; + } + + topicFilters = new TopicFilter[TEST_FILTERS.length]; + subscriptions = new Subscription[TEST_FILTERS.length]; + + originalImplementation = new SubscriberNode(); + optimizedImplementation = new OptimizedSubscriberNode(); + + for (int i = 0; i < TEST_FILTERS.length; i++) { + String rawTopic = TEST_FILTERS[i]; + TopicFilter topicFilter = new TopicFilter(rawTopic); + TestMqttUser owner = new TestMqttUser("init_user_" + i); + + topicFilters[i] = topicFilter; + subscriptions[i] = SubscriberTreeTest.makeSubscription(rawTopic); + + originalImplementation.subscribe(0, owner, subscriptions[i], topicFilter); + optimizedImplementation.subscribe(0, owner, subscriptions[i], topicFilter); + } + } + + @State(Scope.Thread) + public static class ThreadState { + int index = 0; + public MutableArray<@NonNull SingleSubscriber> container; + public MutableRefToRefDictionary mapContainer; + private TestMqttUser userForSubscription; + @Setup(Level.Trial) + public void setupThread() { + container = ArrayFactory.mutableArray(SingleSubscriber.class); + mapContainer = DictionaryFactory.mutableRefToRefDictionary(); + userForSubscription = new TestMqttUser("thread_user_" + Thread.currentThread().getId()); + } + } + + @Group("original") + @Benchmark + public void originalImplementationSubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + originalImplementation.subscribe(0, state.userForSubscription, subscriptions[topicFilterIndex], topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + state.index++; + } + @Group("original") + @Benchmark + public void originalImplementationUnsubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + originalImplementation.unsubscribe(0, state.userForSubscription, topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + } + @Group("original") + @GroupThreads(5) + @Benchmark + public void originalImplementationMatchTo(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicNameIndex = state.index % topicNames.length; + originalImplementation.matchesTo(0, topicNames[topicNameIndex], lastLevels[topicNameIndex], state.container); + bh.consume(state.container); + state.index++; + } + + @Group("optimized") + @Benchmark + public void optimizedImplementationSubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + optimizedImplementation.subscribe(0, state.userForSubscription, subscriptions[topicFilterIndex], topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + state.index++; + } + @Group("optimized") + @Benchmark + public void optimizedImplementationUnsubscribe(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicFilterIndex = state.index % topicFilters.length; + optimizedImplementation.unsubscribe(0, state.userForSubscription, topicFilters[topicFilterIndex]); + bh.consume(topicFilterIndex); + } + @Group("optimized") + @GroupThreads(5) + @Benchmark + public void optimizedImplementationMatchTo(SubscriberNodeBenchmark benchmark, ThreadState state, Blackhole bh) { + int topicNameIndex = state.index % topicNames.length; + optimizedImplementation.matchesTo(0, topicNames[topicNameIndex], lastLevels[topicNameIndex], state.mapContainer); + bh.consume(state.container); + state.index++; + } +} diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index 307db58c..92c2fdba 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -6,19 +6,23 @@ import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.Array; -import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import javasabr.rlib.common.ThreadSafe; import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; import org.jspecify.annotations.Nullable; +@RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class ConcurrentSubscriberTree implements ThreadSafe { - SubscriberNode rootNode; + SubscriberTreeBase rootNode; public ConcurrentSubscriberTree() { - this.rootNode = new SubscriberNode(); + this.rootNode = new OptimizedSubscriberNode(); } @Nullable @@ -31,14 +35,8 @@ public boolean unsubscribe(MqttUser user, TopicFilter topicFilter) { } public Array matches(TopicName topicName) { - var resultArray = MutableArray.ofType(SingleSubscriber.class); - matchesTo(resultArray, topicName); - return resultArray; - } - - public MutableArray matchesTo(MutableArray container, TopicName topicName) { - var resultArray = MutableArray.ofType(SingleSubscriber.class); - rootNode.matchesTo(0, topicName, topicName.levelsCount() - 1, container); - return resultArray; + MutableRefToRefDictionary resultArray = DictionaryFactory.mutableRefToRefDictionary(); + rootNode.matchesTo(0, topicName, topicName.levelsCount() - 1, resultArray); + return resultArray.values(ArrayFactory.mutableArray(SingleSubscriber.class)); } } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java new file mode 100644 index 00000000..0b59f544 --- /dev/null +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/OptimizedSubscriberNode.java @@ -0,0 +1,182 @@ +package javasabr.mqtt.model.subscriber.tree; + +import java.util.function.Supplier; +import javasabr.mqtt.base.util.DebugUtils; +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.model.QoS; +import javasabr.mqtt.model.subscriber.SingleSubscriber; +import javasabr.mqtt.model.subscriber.Subscriber; +import javasabr.mqtt.model.subscription.Subscription; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; +import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.array.LockableArray; +import javasabr.rlib.collections.dictionary.DictionaryFactory; +import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; + +@Getter(AccessLevel.PACKAGE) +@Accessors(fluent = true, chain = false) +@FieldDefaults(level = AccessLevel.PRIVATE) +public class OptimizedSubscriberNode extends SubscriberTreeBase { + + private final static Supplier SUBSCRIBER_NODE_FACTORY = OptimizedSubscriberNode::new; + + static { + DebugUtils.registerIncludedFields("childNodes", "subscribers"); + } + + private void appendSubscribersTo(MutableRefToRefDictionary result, OptimizedSubscriberNode subscriberNode) { + LockableArray subscribers = subscriberNode.subscribers(); + if (subscribers == null) { + return; + } + long stamp = subscribers.readLock(); + try { + for (Subscriber subscriber : subscribers) { + addOrReplaceIfLowerQos(result, subscriber); + } + } finally { + subscribers.readUnlock(stamp); + } + } + + private static void addOrReplaceIfLowerQos(MutableRefToRefDictionary result, Subscriber subscriber) { + SingleSubscriber subscriberFromNode = subscriber.resolveSingle(); + SingleSubscriber singleSubscriber = result.get(subscriberFromNode.user()); + if (singleSubscriber == null) { + result.put(subscriberFromNode.user(), subscriberFromNode); + return; + } + QoS existedQos = singleSubscriber.qos(); + QoS candidateQos = subscriberFromNode.qos(); + if (existedQos.ordinal() < candidateQos.ordinal()) { + result.put(subscriberFromNode.user(), subscriberFromNode); + } + } + + @Nullable + volatile LockableRefToRefDictionary childNodes; + @Nullable + volatile LockableArray subscribers; + + /** + * @return the previous subscription from the same owner + */ + @Nullable + protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { + if (level == topicFilter.levelsCount()) { + return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); + } + OptimizedSubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level)); + return childNode.subscribe(level + 1, owner, subscription, topicFilter); + } + + protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { + if (level == topicFilter.levelsCount()) { + return removeSubscriber(subscribers(), owner, topicFilter); + } + OptimizedSubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level)); + return childNode.unsubscribe(level + 1, owner, topicFilter); + } + + protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableRefToRefDictionary container) { + LockableRefToRefDictionary nodes = childNodes(); + if (nodes == null) { + return; + } + collectMatchingSubscribers(topicName.segment(level), level, topicName, lastLevel, container); + collectMatchingSubscribers(TopicFilter.SINGLE_LEVEL_WILDCARD, level, topicName, lastLevel, container); + collectMatchingSubscribers(TopicFilter.MULTI_LEVEL_WILDCARD, level, topicName, lastLevel, container); + } + + private void collectMatchingSubscribers( + String segment, + int level, + TopicName topicName, + int lastLevel, + MutableRefToRefDictionary result) { + OptimizedSubscriberNode subscriberNode = childNode(segment); + if (subscriberNode == null) { + return; + } + if (level == lastLevel || TopicFilter.MULTI_LEVEL_WILDCARD.equals(segment)) { + appendSubscribersTo(result, subscriberNode); + } else if (level < lastLevel) { + subscriberNode.matchesTo(level + 1, topicName, lastLevel, result); + } + } + + private OptimizedSubscriberNode getOrCreateChildNode(String segment) { + LockableRefToRefDictionary childNodes = getOrCreateChildNodes(); + long stamp = childNodes.readLock(); + try { + OptimizedSubscriberNode subscriberNode = childNodes.get(segment); + if (subscriberNode != null) { + return subscriberNode; + } + } finally { + childNodes.readUnlock(stamp); + } + stamp = childNodes.writeLock(); + try { + return childNodes.getOrCompute(segment, SUBSCRIBER_NODE_FACTORY); + } finally { + childNodes.writeUnlock(stamp); + } + } + + private LockableRefToRefDictionary getOrCreateChildNodes() { + LockableRefToRefDictionary current = childNodes; + if (current != null) { + return current; + } + synchronized (this) { + current = childNodes; + if (current == null) { + current = DictionaryFactory.stampedLockBasedRefToRefDictionary(); + childNodes = current; + } + return current; + } + } + + private LockableArray getOrCreateSubscribers() { + LockableArray current = subscribers; + if (current != null) { + return current; + } + synchronized (this) { + current = subscribers; + if (current == null) { + current = ArrayFactory.stampedLockBasedArray(Subscriber.class); + subscribers = current; + } + return current; + } + } + + @Nullable + private OptimizedSubscriberNode childNode(String segment) { + LockableRefToRefDictionary childNodes = childNodes(); + if (childNodes == null) { + return null; + } + long stamp = childNodes.readLock(); + try { + return childNodes.get(segment); + } finally { + childNodes.readUnlock(stamp); + } + } + + @Override + public String toString() { + return DebugUtils.toJsonString(this); + } +} diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java index 4a6579c2..09abcf1c 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java @@ -13,6 +13,7 @@ import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.collections.dictionary.DictionaryFactory; import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; @@ -22,7 +23,7 @@ @Getter(AccessLevel.PACKAGE) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE) -class SubscriberNode extends SubscriberTreeBase { +public class SubscriberNode extends SubscriberTreeBase { private final static Supplier SUBSCRIBER_NODE_FACTORY = SubscriberNode::new; @@ -39,7 +40,7 @@ class SubscriberNode extends SubscriberTreeBase { * @return the previous subscription from the same owner */ @Nullable - public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { + protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); } @@ -47,7 +48,16 @@ public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscr return childNode.subscribe(level + 1, owner, subscription, topicFilter); } - public boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { + @Override + protected void matchesTo( + int level, + TopicName topicName, + int lastLevel, + MutableRefToRefDictionary container) { + + } + + protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return removeSubscriber(subscribers(), owner, topicFilter); } @@ -61,11 +71,7 @@ protected void matchesTo(int level, TopicName topicName, int lastLevel, MutableA multiWildcardTopicMatch(container); } - private void exactlyTopicMatch( - int level, - TopicName topicName, - int lastLevel, - MutableArray result) { + private void exactlyTopicMatch(int level, TopicName topicName, int lastLevel, MutableArray result) { String segment = topicName.segment(level); SubscriberNode subscriberNode = childNode(segment); if (subscriberNode == null) { diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java index 972b696b..2923ff59 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java @@ -9,8 +9,10 @@ import javasabr.mqtt.model.subscription.Subscription; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.LockableArray; import javasabr.rlib.collections.array.MutableArray; +import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; @@ -18,7 +20,22 @@ @RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) -abstract class SubscriberTreeBase { +public abstract class SubscriberTreeBase { + + protected abstract void matchesTo( + int level, + TopicName topicName, + int lastLevel, + MutableRefToRefDictionary container); + + protected abstract boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter); + + @Nullable + protected abstract SingleSubscriber subscribe( + int level, + MqttUser owner, + Subscription subscription, + TopicFilter topicFilter); /** * @return previous subscriber with the same user @@ -45,9 +62,7 @@ protected static SingleSubscriber addSubscriber( } @Nullable - private static SingleSubscriber removePreviousIfExist( - LockableArray subscribers, - MqttUser user) { + private static SingleSubscriber removePreviousIfExist(LockableArray subscribers, MqttUser user) { int index = subscribers.indexOf(Subscriber::resolveUser, user); if (index < 0) { return null; @@ -142,7 +157,8 @@ private static boolean isSharedSubscriberWithGroup(Subscriber subscriber, String } private static boolean removeDuplicateWithLowerQoS( - MutableArray result, SingleSubscriber candidate) { + MutableArray result, + SingleSubscriber candidate) { int found = result.indexOf(SingleSubscriber::user, candidate.user()); if (found == -1) { diff --git a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy index cfb17623..a9b0c225 100644 --- a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy +++ b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy @@ -624,7 +624,7 @@ class SubscriberTreeTest extends UnitSpecification { return new TestMqttUser(id) } - static def makeSubscription(String topicFilter) { + static Subscription makeSubscription(String topicFilter) { return new Subscription( TopicFilter.valueOf(topicFilter), MqttProperties.SUBSCRIPTION_ID_IS_NOT_SET,