Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
import javasabr.mqtt.network.MqttClientFactory;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.MqttConnectionFactory;
Expand Down Expand Up @@ -178,8 +179,9 @@ MqttInMessageHandler disconnectMqttInMessageHandler(MessageOutFactoryService mes
MqttInMessageHandler subscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
TopicService topicService,
PublishDeliveringService publishDeliveringService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService, publishDeliveringService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscribtion.Subscription;
Expand All @@ -13,12 +13,12 @@
import org.jspecify.annotations.Nullable;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConcurrentTopicTree implements ThreadSafe {
public class ConcurrentSubscriptionTree implements ThreadSafe {

TopicNode rootNode;
TopicFilterNode rootNode;

public ConcurrentTopicTree() {
this.rootNode = new TopicNode();
public ConcurrentSubscriptionTree() {
this.rootNode = new TopicFilterNode();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import java.util.function.Supplier;
import javasabr.mqtt.base.util.DebugUtils;
Expand All @@ -22,16 +22,16 @@
@Getter(AccessLevel.PACKAGE)
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PRIVATE)
class TopicNode extends TopicTreeBase {
class TopicFilterNode extends TopicFilterTreeBase {

private final static Supplier<TopicNode> TOPIC_NODE_FACTORY = TopicNode::new;
private final static Supplier<TopicFilterNode> TOPIC_NODE_FACTORY = TopicFilterNode::new;

static {
DebugUtils.registerIncludedFields("childNodes", "subscribers");
}

@Nullable
volatile LockableRefToRefDictionary<String, TopicNode> childNodes;
volatile LockableRefToRefDictionary<String, TopicFilterNode> childNodes;
@Nullable
volatile LockableArray<Subscriber> subscribers;

Expand All @@ -43,15 +43,15 @@ public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscripti
if (level == topicFilter.levelsCount()) {
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
}
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
}

public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {
if (level == topicFilter.levelsCount()) {
return removeSubscriber(subscribers(), owner, topicFilter);
}
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
return childNode.unsubscribe(level + 1, owner, topicFilter);
}

Expand All @@ -67,14 +67,14 @@ private void exactlyTopicMatch(
int lastLevel,
MutableArray<SingleSubscriber> result) {
String segment = topicName.segment(level);
TopicNode topicNode = childNode(segment);
if (topicNode == null) {
TopicFilterNode topicFilterNode = childNode(segment);
if (topicFilterNode == null) {
return;
}
if (level == lastLevel) {
appendSubscribersTo(result, topicNode);
appendSubscribersTo(result, topicFilterNode);
} else if (level < lastLevel) {
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
}
}

Expand All @@ -83,31 +83,31 @@ private void singleWildcardTopicMatch(
TopicName topicName,
int lastLevel,
MutableArray<SingleSubscriber> result) {
TopicNode topicNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
if (topicNode == null) {
TopicFilterNode topicFilterNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
if (topicFilterNode == null) {
return;
}
if (level == lastLevel) {
appendSubscribersTo(result, topicNode);
appendSubscribersTo(result, topicFilterNode);
} else if (level < lastLevel) {
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
}
}

private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) {
TopicNode topicNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
if (topicNode != null) {
appendSubscribersTo(result, topicNode);
TopicFilterNode topicFilterNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
if (topicFilterNode != null) {
appendSubscribersTo(result, topicFilterNode);
}
}

private TopicNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicNode> childNodes = getOrCreateChildNodes();
private TopicFilterNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = getOrCreateChildNodes();
long stamp = childNodes.readLock();
try {
TopicNode topicNode = childNodes.get(segment);
if (topicNode != null) {
return topicNode;
TopicFilterNode topicFilterNode = childNodes.get(segment);
if (topicFilterNode != null) {
return topicFilterNode;
}
} finally {
childNodes.readUnlock(stamp);
Expand All @@ -122,8 +122,8 @@ private TopicNode getOrCreateChildNode(String segment) {
}

@Nullable
private TopicNode childNode(String segment) {
LockableRefToRefDictionary<String, TopicNode> childNodes = childNodes();
private TopicFilterNode childNode(String segment) {
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = childNodes();
if (childNodes == null) {
return null;
}
Expand All @@ -135,7 +135,7 @@ private TopicNode childNode(String segment) {
}
}

private LockableRefToRefDictionary<String, TopicNode> getOrCreateChildNodes() {
private LockableRefToRefDictionary<String, TopicFilterNode> getOrCreateChildNodes() {
if (childNodes == null) {
synchronized (this) {
if (childNodes == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import java.util.Objects;
import javasabr.mqtt.model.QoS;
Expand All @@ -18,7 +18,7 @@

@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
abstract class TopicTreeBase {
abstract class TopicFilterTreeBase {

/**
* @return previous subscriber with the same owner
Expand Down Expand Up @@ -66,7 +66,7 @@ private static void addSharedSubscriber(
String group = sharedTopicFilter.shareName();
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
.iterations()
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);

if (sharedSubscriber == null) {
sharedSubscriber = new SharedSubscriber(sharedTopicFilter);
Expand All @@ -76,8 +76,8 @@ private static void addSharedSubscriber(
sharedSubscriber.addSubscriber(new SingleSubscriber(owner, subscription));
}

protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicNode topicNode) {
LockableArray<Subscriber> subscribers = topicNode.subscribers();
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicFilterNode topicFilterNode) {
LockableArray<Subscriber> subscribers = topicFilterNode.subscribers();
if (subscribers == null) {
return;
}
Expand Down Expand Up @@ -125,7 +125,7 @@ private static boolean removeSharedSubscriber(
String group = sharedTopicFilter.shareName();
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
.iterations()
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
if (sharedSubscriber != null) {
boolean removed = sharedSubscriber.removeSubscriberWithOwner(owner);
if (sharedSubscriber.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
@NullMarked
package javasabr.mqtt.model.subscribtion.tree;

import org.jspecify.annotations.NullMarked;
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
public class TopicFilter extends AbstractTopic {

public static final String MULTI_LEVEL_WILDCARD = "#";
public static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
public static final char MULTI_LEVEL_WILDCARD_CHAR = MULTI_LEVEL_WILDCARD.charAt(0);
public static final String SINGLE_LEVEL_WILDCARD = "+";
public static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
public static final char SINGLE_LEVEL_WILDCARD_CHAR = SINGLE_LEVEL_WILDCARD.charAt(0);
public static final String SPECIAL = "$";

public static final TopicFilter INVALID_TOPIC_FILTER = new TopicFilter("$invalid$") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package javasabr.mqtt.model.topic.tree;

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.rlib.common.ThreadSafe;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConcurrentRetainedMessageTree implements ThreadSafe {

TopicMessageNode rootNode;

public ConcurrentRetainedMessageTree() {
this.rootNode = new TopicMessageNode();
}

public void retainMessage(Publish message) {
rootNode.retainMessage(0, message, message.topicName());
}

public @Nullable Publish getRetainedMessage(TopicName topicName) {
return rootNode.getRetainedMessage(0, topicName);
}

public @Nullable Publish getRetainedMessage(TopicFilter topicFilter) {
return rootNode.getRetainedMessage(0, topicFilter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package javasabr.mqtt.model.topic.tree;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.rlib.collections.dictionary.DictionaryFactory;
import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

@Getter(AccessLevel.PACKAGE)
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PRIVATE)
class TopicMessageNode {

private final static Supplier<TopicMessageNode> TOPIC_NODE_FACTORY = TopicMessageNode::new;

static {
DebugUtils.registerIncludedFields("childNodes", "retainedMessage");
}

@Nullable
volatile LockableRefToRefDictionary<String, TopicMessageNode> childNodes;
final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>();

public void retainMessage(int level, Publish message, TopicName topicFilter) {
if (level + 1 == topicFilter.levelsCount()) {
retainedMessage.set(message);
return;
}
TopicMessageNode childNode = getOrCreateChildNode(topicFilter.segment(level));
childNode.retainMessage(level + 1, message, topicFilter);
}

@Nullable
public Publish getRetainedMessage(int level, TopicName topicName) {
if (level + 1 == topicName.levelsCount()) {
return retainedMessage.get();
}
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
return childNode.getRetainedMessage(level + 1, topicName);
}

@Nullable
public Publish getRetainedMessage(int level, TopicFilter topicName) {
if (level + 1 == topicName.levelsCount()) {
return retainedMessage.get();
}
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
return childNode.getRetainedMessage(level + 1, topicName);
}

private TopicMessageNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicMessageNode> childNodes = getOrCreateChildNodes();
long stamp = childNodes.readLock();
try {
TopicMessageNode topicFilterNode = childNodes.get(segment);
if (topicFilterNode != null) {
return topicFilterNode;
}
} finally {
childNodes.readUnlock(stamp);
}
stamp = childNodes.writeLock();
try {
return childNodes.getOrCompute(segment, TOPIC_NODE_FACTORY);
} finally {
childNodes.writeUnlock(stamp);
}
}

private LockableRefToRefDictionary<String, TopicMessageNode> getOrCreateChildNodes() {
if (childNodes == null) {
synchronized (this) {
if (childNodes == null) {
childNodes = DictionaryFactory.stampedLockBasedRefToRefDictionary();
}
}
}
//noinspection ConstantConditions
return childNodes;
}

@Override
public String toString() {
return DebugUtils.toJsonString(this);
}
}
Loading
Loading