Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ tasks.withType(GroovyCompile).configureEach {
configurations.each {
it.exclude group: "org.slf4j", module: "slf4j-log4j12"
it.exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
}
}

bootJar {
mainClass = 'javasabr.mqtt.broker.application.MqttBrokerApplication'
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ CredentialSource credentialSource(
@Bean
AuthenticationService authenticationService(
CredentialSource credentialSource,
@Value("${authentication.allow.anonymous:false}") boolean allowAnonymousAuth) {
@Value("${authentication.allow.anonymous:false}")
boolean allowAnonymousAuth) {
return new SimpleAuthenticationService(credentialSource, allowAnonymousAuth);
}

@Bean
SubscriptionService subscriptionService() {
return new InMemorySubscriptionService();
SubscriptionService subscriptionService(PublishDeliveringService publishDeliveringService) {
return new InMemorySubscriptionService(publishDeliveringService);
}

@Bean
Expand Down Expand Up @@ -153,10 +154,7 @@ MqttInMessageHandler publishMqttInMessageHandler(
PublishReceivingService publishReceivingService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new PublishMqttInMessageHandler(
publishReceivingService,
messageOutFactoryService,
topicService);
return new PublishMqttInMessageHandler(publishReceivingService, messageOutFactoryService, topicService);
}

@Bean
Expand Down Expand Up @@ -187,10 +185,7 @@ MqttInMessageHandler unsubscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new UnsubscribeMqttInMessageHandler(
subscriptionService,
messageOutFactoryService,
topicService);
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
}

@Bean
Expand All @@ -199,24 +194,18 @@ ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessa
}

@Bean
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
Expand All @@ -230,32 +219,23 @@ MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos0MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos1MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos2MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
Expand All @@ -280,22 +260,10 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
"mqtt.external.connection.max.message.size",
int.class,
MqttProperties.MAXIMUM_MESSAGE_SIZE_DEFAULT),
env.getProperty(
"mqtt.external.connection.max.string.length",
int.class,
MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty(
"mqtt.external.connection.max.binary.size",
int.class,
MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty(
"mqtt.external.connection.max.topic.levels",
int.class,
MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty(
"mqtt.external.connection.min.keep.alive",
int.class,
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
env.getProperty("mqtt.external.connection.max.string.length", int.class, MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty("mqtt.external.connection.max.binary.size", int.class, MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty("mqtt.external.connection.max.topic.levels", int.class, MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty("mqtt.external.connection.min.keep.alive", int.class, MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
env.getProperty(
"mqtt.external.connection.receive.maximum",
int.class,
Expand Down Expand Up @@ -360,7 +328,8 @@ NetworkMqttUserFactory externalClientFactory(NetworkMqttUserReleaseHandler exter
MqttConnectionFactory externalConnectionFactory(
MqttServerConnectionConfig externalServerConnectionConfig,
NetworkMqttUserFactory mqttUserFactory,
@Value("${mqtt.external.connection.max.packets.by.read:100}") int maxPacketsByRead) {
@Value("${mqtt.external.connection.max.packets.by.read:100}")
int maxPacketsByRead) {
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, mqttUserFactory, maxPacketsByRead);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;

public interface PublishDeliveringService {

PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);

Array<PublishHandlingResult> deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
Expand All @@ -17,8 +16,6 @@
*/
public interface SubscriptionService {

NetworkMqttUser resolveClient(Subscriber subscriber);

default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;
Expand All @@ -18,6 +22,7 @@ public class DefaultPublishDeliveringService implements PublishDeliveringService

@Nullable
MqttPublishOutMessageHandler[] publishOutMessageHandlers;
ConcurrentRetainedMessageTree retainedMessageTree;

public DefaultPublishDeliveringService(
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
Expand All @@ -39,14 +44,15 @@ public DefaultPublishDeliveringService(
}
handlers[qos.level()] = knownPublishOutHandler;
}

this.retainedMessageTree = new ConcurrentRetainedMessageTree();
this.publishOutMessageHandlers = handlers;
log.info(publishOutMessageHandlers, DefaultPublishDeliveringService::buildServiceDescription);
}

@Override
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
try {
retainedMessageTree.retainMessage(publish);
//noinspection DataFlowIssue
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
Expand All @@ -55,6 +61,16 @@ public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber s
}
}

@Override
public Array<PublishHandlingResult> deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber) {
Array<Publish> retainedMessage = retainedMessageTree.getRetainedMessage(topicFilter);
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
for (Publish message : retainedMessage) {
result.add(startDelivering(message, subscriber));
}
return result;
}

private static String buildServiceDescription(
@Nullable MqttPublishOutMessageHandler[] publishOutMessageHandlers) {
var builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
package javasabr.mqtt.service.impl;

import static javasabr.mqtt.model.SubscribeRetainHandling.SEND;
import static javasabr.mqtt.model.SubscribeRetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.SUCCESS;

import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.ActiveSubscriptions;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.SharedTopicFilter;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.ArrayFactory;
import javasabr.rlib.collections.array.MutableArray;
Expand All @@ -31,18 +35,12 @@
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class InMemorySubscriptionService implements SubscriptionService {

PublishDeliveringService publishDeliveringService;
ConcurrentSubscriberTree subscriberTree;

public InMemorySubscriptionService() {
public InMemorySubscriptionService(PublishDeliveringService publishDeliveringService) {
this.subscriberTree = new ConcurrentSubscriberTree();
}

@Override
public NetworkMqttUser resolveClient(Subscriber subscriber) {
if (subscriber instanceof SingleSubscriber single) {
return (NetworkMqttUser) single.user();
}
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
this.publishDeliveringService = publishDeliveringService;
}

@Override
Expand Down Expand Up @@ -84,6 +82,10 @@ private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession
if (previous != null) {
activeSubscriptions.remove(previous.subscription());
}
if ((subscription.retainHandling() == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previous != null)
|| subscription.retainHandling() == SEND) {
sendRetainedMessages(user, subscription);
}
activeSubscriptions.add(subscription);
return subscription.qos().subscribeAckReasonCode();
}
Expand Down Expand Up @@ -137,4 +139,39 @@ public void restoreSubscriptions(NetworkMqttUser user, MqttSession session) {
subscriberTree.subscribe(user, subscription);
}
}

private void sendRetainedMessages(MqttUser user, Subscription subscription) {
int count = 0;
PublishHandlingResult errorResult = null;
if (subscription
.qos()
.subscribeAckReasonCode()
.ordinal() > 2) {
// TODO handle error ?
return;
}
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
var results = publishDeliveringService.deliverRetainedMessages(subscription.topicFilter(), singleSubscriber);
for (PublishHandlingResult result : results) {
if (result.error()) {
errorResult = result;
} else if (result == PublishHandlingResult.SUCCESS) {
count++;
}
if (errorResult != null) {
log.debug(
user.clientId(),
errorResult,
"[%s] Found final error:[%s] during sending retained messages"::formatted);
// TODO handleError(client, publish, errorResult);
} else {
log.debug(
user.clientId(),
count,
"[%s] Successfully started delivering retained messages to [%s] subscribers"::formatted);
// TODO handleSuccessfulResult(client, publish, count);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import lombok.AccessLevel;
Expand All @@ -22,12 +22,18 @@ public abstract class AbstractMqttPublishOutMessageHandler<U extends NetworkMqtt
implements MqttPublishOutMessageHandler {

Class<U> expectedUser;
SubscriptionService subscriptionService;
MessageOutFactoryService messageOutFactoryService;

private static NetworkMqttUser resolveClient(Subscriber subscriber) {
if (subscriber instanceof SingleSubscriber single) {
return (NetworkMqttUser) single.user();
}
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
}

@Override
public PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber) {
NetworkMqttUser user = subscriptionService.resolveClient(subscriber);
NetworkMqttUser user = resolveClient(subscriber);
if (!expectedUser.isInstance(user)) {
log.warning(user, "Accepted not expected client:[%s]"::formatted);
return PublishHandlingResult.NOT_EXPECTED_CLIENT;
Expand Down
Loading
Loading