Skip to content

Commit 3d8679c

Browse files
authored
Merge pull request #13 from JavaSaBr/feature-broker-12
[broker-12] add concurrent access support to subscription and authent…
2 parents fc0aee7 + 3e7aef6 commit 3d8679c

File tree

9 files changed

+83
-41
lines changed

9 files changed

+83
-41
lines changed

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
public class SubscribeTopicFilter {
99

1010
/**
11-
* The subscriber's topic filter.
11+
* The subscriber's topic name.
1212
*/
13-
private final String topicFilter;
13+
private final String topicName;
1414

1515
/**
1616
* Maximum QoS field. This gives the maximum QoS level at which the Server

src/main/java/com/ss/mqtt/broker/service/SubscriptionService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.ss.mqtt.broker.model.SubscribeAckReasonCode;
44
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
5+
import com.ss.mqtt.broker.model.Subscriber;
56
import com.ss.mqtt.broker.model.UnsubscribeAckReasonCode;
67
import com.ss.mqtt.broker.network.client.MqttClient;
78
import com.ss.rlib.common.util.array.Array;
@@ -42,5 +43,5 @@ public interface SubscriptionService {
4243
* @param topicName topic name
4344
* @return array of topic subscribers
4445
*/
45-
@NotNull Array<MqttClient> getSubscribers(@NotNull String topicName);
46+
@NotNull Array<Subscriber> getSubscribers(@NotNull String topicName);
4647
}

src/main/java/com/ss/mqtt/broker/service/Subscriptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.ss.mqtt.broker.model.SubscribeAckReasonCode;
44
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
5+
import com.ss.mqtt.broker.model.Subscriber;
56
import com.ss.mqtt.broker.model.UnsubscribeAckReasonCode;
67
import com.ss.mqtt.broker.network.client.MqttClient;
78
import com.ss.rlib.common.util.array.Array;
@@ -18,7 +19,7 @@ public interface Subscriptions {
1819
* @param topicName topic name on which subscribers should be returned
1920
* @return array of MQTT clients
2021
*/
21-
@NotNull Array<MqttClient> getSubscribers(@NotNull String topicName);
22+
@NotNull Array<Subscriber> getSubscribers(@NotNull String topicName);
2223

2324
/**
2425
* Returns result of subscription adding

src/main/java/com/ss/mqtt/broker/service/impl/AbstractCredentialSource.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,27 @@
11
package com.ss.mqtt.broker.service.impl;
22

33
import com.ss.mqtt.broker.service.CredentialSource;
4+
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
5+
import com.ss.rlib.common.util.dictionary.Dictionary;
6+
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
47
import org.jetbrains.annotations.NotNull;
58
import reactor.core.publisher.Mono;
69

7-
import java.nio.charset.StandardCharsets;
810
import java.util.Arrays;
9-
import java.util.HashMap;
10-
import java.util.Map;
1111

1212
public abstract class AbstractCredentialSource implements CredentialSource {
1313

14-
private final Map<String, byte[]> credentials = new HashMap<>();
14+
private final ConcurrentObjectDictionary<String, byte[]> credentials =
15+
ConcurrentObjectDictionary.ofType(String.class, byte[].class);
1516

1617
abstract void init();
1718

18-
void putCredentials(@NotNull Object user, @NotNull Object pass) {
19-
credentials.put(user.toString(), pass.toString().getBytes(StandardCharsets.UTF_8));
19+
void putAll(@NotNull Dictionary<String, byte[]> creds) {
20+
credentials.runInWriteLock(creds, Dictionary::put);
21+
}
22+
23+
void put(@NotNull String user, @NotNull byte[] pass) {
24+
credentials.runInWriteLock(user, pass, ObjectDictionary::put);
2025
}
2126

2227
@Override

src/main/java/com/ss/mqtt/broker/service/impl/FileCredentialsSource.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.ss.mqtt.broker.service.impl;
22

33
import com.ss.mqtt.broker.exception.CredentialsSourceException;
4+
import com.ss.rlib.common.util.dictionary.Dictionary;
5+
import com.ss.rlib.common.util.dictionary.DictionaryCollectors;
46
import org.jetbrains.annotations.NotNull;
57

68
import java.io.FileInputStream;
79
import java.io.IOException;
10+
import java.nio.charset.StandardCharsets;
811
import java.util.Properties;
912

1013
public class FileCredentialsSource extends AbstractCredentialSource {
@@ -25,7 +28,16 @@ void init() {
2528
try {
2629
var credentialsProperties = new Properties();
2730
credentialsProperties.load(new FileInputStream(credentialUrl.getPath()));
28-
credentialsProperties.forEach(this::putCredentials);
31+
32+
Dictionary<String, byte[]> creds = credentialsProperties.entrySet()
33+
.stream()
34+
.collect(DictionaryCollectors.toObjectDictionary(
35+
entry -> entry.getKey().toString(),
36+
entry -> entry.getValue().toString().getBytes(StandardCharsets.UTF_8)
37+
));
38+
39+
putAll(creds);
40+
2941
} catch (IOException e) {
3042
throw new CredentialsSourceException(e);
3143
}

src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.ss.mqtt.broker.service.impl;
22

33
import com.ss.mqtt.broker.model.PublishAckReasonCode;
4-
import com.ss.mqtt.broker.network.client.MqttClient;
4+
import com.ss.mqtt.broker.model.Subscriber;
55
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
66
import com.ss.mqtt.broker.service.PublishingService;
77
import com.ss.mqtt.broker.service.SubscriptionService;
@@ -17,10 +17,11 @@ public class SimplePublishingService implements PublishingService {
1717
private final @NotNull SubscriptionService subscriptionService;
1818

1919
private static @NotNull PublishAckReasonCode send(
20-
@NotNull MqttClient mqttClient,
20+
@NotNull Subscriber subscriber,
2121
@NotNull PublishInPacket publish
2222
) {
2323

24+
var mqttClient = subscriber.getMqttClient();
2425
mqttClient.send(mqttClient.getPacketOutFactory().newPublish(
2526
mqttClient,
2627
publish.getPacketId(),
@@ -50,7 +51,7 @@ public class SimplePublishingService implements PublishingService {
5051
}
5152

5253
var success = subscribers.stream()
53-
.map(targetMqttClient -> send(targetMqttClient, publish))
54+
.map(subscriber -> send(subscriber, publish))
5455
.allMatch(ackReasonCode -> ackReasonCode.equals(PublishAckReasonCode.SUCCESS));
5556

5657
return success ? PublishAckReasonCode.SUCCESS : PublishAckReasonCode.UNSPECIFIED_ERROR;

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptionService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.ss.mqtt.broker.model.SubscribeAckReasonCode;
44
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
5+
import com.ss.mqtt.broker.model.Subscriber;
56
import com.ss.mqtt.broker.model.UnsubscribeAckReasonCode;
67
import com.ss.mqtt.broker.network.client.MqttClient;
78
import com.ss.mqtt.broker.service.SubscriptionService;
@@ -40,7 +41,7 @@ public class SimpleSubscriptionService implements SubscriptionService {
4041
}
4142

4243
@Override
43-
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topicName) {
44+
public @NotNull Array<Subscriber> getSubscribers(@NotNull String topicName) {
4445
return subscriptions.getSubscribers(topicName);
4546
}
4647
}

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptions.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,50 +6,71 @@
66
import com.ss.mqtt.broker.model.UnsubscribeAckReasonCode;
77
import com.ss.mqtt.broker.network.client.MqttClient;
88
import com.ss.mqtt.broker.service.Subscriptions;
9+
import com.ss.rlib.common.function.NotNullSupplier;
910
import com.ss.rlib.common.util.array.Array;
10-
import com.ss.rlib.common.util.array.ArrayCollectors;
11-
import com.ss.rlib.common.util.array.impl.FastArraySet;
11+
import com.ss.rlib.common.util.array.ConcurrentArray;
12+
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
13+
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
1214
import org.jetbrains.annotations.NotNull;
1315

14-
import java.util.HashMap;
15-
import java.util.Map;
16-
1716
/**
1817
* Simple container of subscriptions
1918
*/
2019
public class SimpleSubscriptions implements Subscriptions {
2120

22-
private final @NotNull Map<String, Array<Subscriber>> subscriptions = new HashMap<>();
21+
private final static @NotNull NotNullSupplier<ConcurrentArray<Subscriber>> SUBSCRIBER_ARRAY_SUPPLIER =
22+
ConcurrentArray.supplier(Subscriber.class);
23+
24+
private final @NotNull ConcurrentObjectDictionary<String, ConcurrentArray<Subscriber>> subscriptions =
25+
ConcurrentObjectDictionary.ofType(String.class, ConcurrentArray.class);
2326

24-
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topicName) {
25-
return subscriptions.get(topicName)
26-
.stream()
27-
.map(Subscriber::getMqttClient)
28-
.collect(ArrayCollectors.toArray(MqttClient.class));
27+
public @NotNull Array<Subscriber> getSubscribers(@NotNull String topicName) {
28+
29+
var subscribers = subscriptions.getInReadLock(topicName, ObjectDictionary::get);
30+
if (subscribers == null) {
31+
return Array.empty();
32+
}
33+
34+
//noinspection ConstantConditions
35+
return subscribers.getInReadLock(Array::of);
2936
}
3037

3138
public @NotNull SubscribeAckReasonCode addSubscription(
3239
@NotNull SubscribeTopicFilter topicFilter,
3340
@NotNull MqttClient mqttClient
3441
) {
3542
var subscriber = new Subscriber(mqttClient, topicFilter);
36-
var subscribers = subscriptions.computeIfAbsent(
37-
topicFilter.getTopicFilter(),
38-
key -> new FastArraySet<>(Subscriber.class)
39-
);
40-
subscribers.add(subscriber);
43+
var subscribers = subscriptions.getInReadLock(topicFilter.getTopicName(), ObjectDictionary::get);
44+
45+
if (subscribers == null) {
46+
subscribers = subscriptions.getInWriteLock(
47+
topicFilter.getTopicName(),
48+
SUBSCRIBER_ARRAY_SUPPLIER,
49+
ObjectDictionary::getOrCompute
50+
);
51+
}
52+
53+
//noinspection ConstantConditions
54+
subscribers.runInWriteLock(subscriber, Array::add);
55+
4156
return topicFilter.getQos().getSubscribeAckReasonCode();
4257
}
4358

4459
public @NotNull UnsubscribeAckReasonCode removeSubscription(
4560
@NotNull String topicName,
4661
@NotNull MqttClient mqttClient
4762
) {
48-
var subscribers = subscriptions.getOrDefault(topicName, Array.empty());
49-
if (subscribers.removeIf(subscriber -> mqttClient.equals(subscriber.getMqttClient()))) {
50-
return UnsubscribeAckReasonCode.SUCCESS;
51-
} else {
63+
var subscribers = subscriptions.getInReadLock(topicName, ObjectDictionary::get);
64+
65+
if (subscribers == null) {
5266
return UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
67+
} else {
68+
boolean removed = subscribers.removeIfInWriteLock(
69+
mqttClient,
70+
(client, subscriber) -> client.equals(subscriber.getMqttClient())
71+
);
72+
73+
return removed ? UnsubscribeAckReasonCode.SUCCESS : UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
5374
}
5475
}
5576
}

src/test/groovy/com/ss/mqtt/broker/test/network/in/SubscribeInPacketTest.groovy

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
2929
result
3030
packet.topicFilters.size() == 2
3131
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
32-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
32+
packet.topicFilters.get(0).getTopicName() == topicFilter
3333
packet.topicFilters.get(0).isNoLocal()
3434
!packet.topicFilters.get(0).isRetainAsPublished()
3535
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
3636
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
37-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
37+
packet.topicFilters.get(1).getTopicName() == topicFilter2
3838
packet.topicFilters.get(1).isNoLocal()
3939
!packet.topicFilters.get(1).isRetainAsPublished()
4040
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
@@ -69,12 +69,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
6969
result
7070
packet.topicFilters.size() == 2
7171
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
72-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
72+
packet.topicFilters.get(0).getTopicName() == topicFilter
7373
!packet.topicFilters.get(0).isNoLocal()
7474
packet.topicFilters.get(0).isRetainAsPublished()
7575
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
7676
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
77-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
77+
packet.topicFilters.get(1).getTopicName() == topicFilter2
7878
packet.topicFilters.get(1).isNoLocal()
7979
!packet.topicFilters.get(1).isRetainAsPublished()
8080
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_SUBSCRIBE_ONLY_IF_THE_SUBSCRIPTION_DOES_NOT_CURRENTLY_EXIST
@@ -98,12 +98,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
9898
result
9999
packet.topicFilters.size() == 2
100100
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
101-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
101+
packet.topicFilters.get(0).getTopicName() == topicFilter
102102
!packet.topicFilters.get(0).isNoLocal()
103103
!packet.topicFilters.get(0).isRetainAsPublished()
104104
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
105105
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
106-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
106+
packet.topicFilters.get(1).getTopicName() == topicFilter2
107107
!packet.topicFilters.get(1).isNoLocal()
108108
!packet.topicFilters.get(1).isRetainAsPublished()
109109
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE

0 commit comments

Comments
 (0)