Skip to content

Commit efa9d00

Browse files
committed
Merge remote-tracking branch 'github/develop' into feature-broker-8
# Conflicts: # src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java # src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy
2 parents 84f8146 + e442a76 commit efa9d00

22 files changed

+307
-100
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,20 @@
77
import com.ss.mqtt.broker.network.client.impl.DeviceMqttClient;
88
import com.ss.mqtt.broker.network.packet.PacketType;
99
import com.ss.mqtt.broker.network.packet.in.handler.*;
10-
import com.ss.mqtt.broker.service.ClientIdRegistry;
11-
import com.ss.mqtt.broker.service.ClientService;
12-
import com.ss.mqtt.broker.service.PublishingService;
13-
import com.ss.mqtt.broker.service.SubscriptionService;
10+
import com.ss.mqtt.broker.service.*;
1411
import com.ss.mqtt.broker.service.impl.*;
15-
import com.ss.rlib.network.*;
12+
import com.ss.rlib.network.BufferAllocator;
13+
import com.ss.rlib.network.Network;
14+
import com.ss.rlib.network.NetworkFactory;
15+
import com.ss.rlib.network.ServerNetworkConfig;
1616
import com.ss.rlib.network.impl.DefaultBufferAllocator;
1717
import com.ss.rlib.network.server.ServerNetwork;
1818
import lombok.RequiredArgsConstructor;
1919
import lombok.extern.log4j.Log4j2;
2020
import org.jetbrains.annotations.NotNull;
2121
import org.springframework.context.annotation.Bean;
2222
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.context.annotation.PropertySource;
2324
import org.springframework.core.env.Environment;
2425

2526
import java.net.InetSocketAddress;
@@ -52,8 +53,8 @@ private interface ChannelFactory extends
5253
return new DefaultClientService();
5354
}
5455

55-
@NotNull
56-
@Bean ClientIdRegistry clientIdRegistry() {
56+
@Bean
57+
@NotNull ClientIdRegistry clientIdRegistry() {
5758
return new SimpleClientIdRegistry(
5859
env.getProperty(
5960
"client.id.available.chars",
@@ -63,15 +64,32 @@ private interface ChannelFactory extends
6364
);
6465
}
6566

67+
@Bean
68+
@NotNull CredentialSource credentialSource() {
69+
return new FileCredentialsSource(env.getProperty("credentials.source.file.name", "credentials"));
70+
}
71+
72+
@Bean
73+
@NotNull AuthenticationService authenticationService(
74+
@NotNull CredentialSource credentialSource,
75+
@NotNull ClientIdRegistry clientIdRegistry
76+
) {
77+
return new SimpleAuthenticationService(
78+
credentialSource,
79+
env.getProperty("authentication.allow.anonymous", boolean.class, false)
80+
);
81+
}
82+
6683
@Bean
6784
PacketInHandler @NotNull [] devicePacketHandlers(
85+
@NotNull AuthenticationService authenticationService,
6886
@NotNull ClientIdRegistry clientIdRegistry,
6987
@NotNull SubscriptionService subscriptionService,
7088
@NotNull PublishingService publishingService
7189
) {
7290

7391
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
74-
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(clientIdRegistry);
92+
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(clientIdRegistry, authenticationService);
7593
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
7694
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
7795
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class CredentialsSourceException extends RuntimeException {
6+
7+
public CredentialsSourceException(@NotNull String message) {
8+
super(message);
9+
}
10+
11+
public CredentialsSourceException(@NotNull Throwable cause) {
12+
super(cause);
13+
}
14+
}

src/main/java/com/ss/mqtt/broker/service/Subscriber.java renamed to src/main/java/com/ss/mqtt/broker/model/Subscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ss.mqtt.broker.service;
1+
package com.ss.mqtt.broker.model;
22

33
import com.ss.mqtt.broker.model.QoS;
44
import com.ss.mqtt.broker.model.SubscribeRetainHandling;

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public class ConnectInPacket extends MqttReadablePacket {
192192
private @NotNull String clientId;
193193
private @NotNull String willTopic;
194194
private @NotNull String username;
195-
private @NotNull String password;
195+
private @NotNull byte[] password;
196196

197197
private @NotNull byte[] willPayload;
198198

@@ -223,7 +223,7 @@ public ConnectInPacket(byte info) {
223223
this.clientId = StringUtils.EMPTY;
224224
this.willTopic = StringUtils.EMPTY;
225225
this.username = StringUtils.EMPTY;
226-
this.password = StringUtils.EMPTY;
226+
this.password = ArrayUtils.EMPTY_BYTE_ARRAY;
227227
this.authenticationMethod = StringUtils.EMPTY;
228228
this.willPayload = ArrayUtils.EMPTY_BYTE_ARRAY;
229229
this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
@@ -315,13 +315,14 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
315315
willPayload = readBytes(buffer);
316316
}
317317

318-
if (hasPassword) {
319-
password = readString(buffer);
320-
}
321-
322318
if (hasUserName) {
323319
username = readString(buffer);
324320
}
321+
322+
if (hasPassword) {
323+
password = readBytes(buffer);
324+
}
325+
325326
}
326327

327328
@Override

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
package com.ss.mqtt.broker.network.packet.in.handler;
22

3+
import static reactor.core.publisher.Mono.fromCallable;
34
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
45
import static reactor.core.publisher.Mono.fromRunnable;
56
import com.ss.mqtt.broker.exception.ConnectionRejectException;
67
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
78
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
89
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
910
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
11+
import com.ss.mqtt.broker.service.AuthenticationService;
1012
import com.ss.mqtt.broker.service.ClientIdRegistry;
1113
import com.ss.rlib.common.util.StringUtils;
1214
import lombok.RequiredArgsConstructor;
1315
import org.jetbrains.annotations.NotNull;
16+
import reactor.core.publisher.Mono;
1417

1518
@RequiredArgsConstructor
1619
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {
1720

1821
private final ClientIdRegistry clientIdRegistry;
22+
private final AuthenticationService authenticationService;
1923

2024
@Override
2125
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -27,30 +31,56 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
2731
return;
2832
}
2933

30-
var requestedClientId = packet.getClientId();
34+
authenticationService.auth(packet.getUsername(), packet.getPassword())
35+
.filter(Boolean::booleanValue)
36+
.flatMap(authenticated -> registerClient(client, packet))
37+
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD)))
38+
.subscribe();
39+
}
3140

32-
// if we should assign our client id
41+
private @NotNull Mono<Boolean> registerClient(
42+
@NotNull UnsafeMqttClient client,
43+
@NotNull ConnectInPacket packet
44+
) {
45+
var requestedClientId = packet.getClientId();
3346
if (StringUtils.isEmpty(requestedClientId)) {
34-
clientIdRegistry.generate()
35-
.doOnNext(client::setClientId)
36-
.flatMap(clientIdRegistry::register)
37-
.filter(Boolean::booleanValue)
38-
.then(fromRunnable(() -> onConnected(client, packet, requestedClientId)))
39-
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)))
40-
.subscribe();
47+
return processWithoutClientId(client, packet, requestedClientId);
4148
} else {
42-
clientIdRegistry.register(requestedClientId)
43-
.subscribe(result -> {
44-
if(!result) {
45-
client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID);
46-
} else {
47-
client.setClientId(requestedClientId);
48-
onConnected(client, packet, requestedClientId);
49-
}
50-
});
49+
return processWithClientId(client, packet, requestedClientId);
5150
}
5251
}
5352

53+
private @NotNull Mono<Boolean> processWithClientId(
54+
@NotNull UnsafeMqttClient client,
55+
@NotNull ConnectInPacket packet,
56+
@NotNull String requestedClientId
57+
) {
58+
return clientIdRegistry.register(requestedClientId)
59+
.filter(Boolean::booleanValue)
60+
.flatMap(registered -> fromCallable(() -> {
61+
client.setClientId(requestedClientId);
62+
onConnected(client, packet, requestedClientId);
63+
return true;
64+
}))
65+
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)));
66+
}
67+
68+
private @NotNull Mono<Boolean> processWithoutClientId(
69+
@NotNull UnsafeMqttClient client,
70+
@NotNull ConnectInPacket packet,
71+
@NotNull String requestedClientId
72+
) {
73+
return clientIdRegistry.generate()
74+
.doOnNext(client::setClientId)
75+
.flatMap(clientIdRegistry::register)
76+
.filter(Boolean::booleanValue)
77+
.flatMap(registered -> fromCallable(() -> {
78+
onConnected(client, packet, requestedClientId);
79+
return true;
80+
}))
81+
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)));
82+
}
83+
5484
private void onConnected(
5585
@NotNull UnsafeMqttClient client,
5686
@NotNull ConnectInPacket packet,
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.ss.mqtt.broker.service;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import reactor.core.publisher.Mono;
5+
6+
public interface AuthenticationService {
7+
8+
@NotNull Mono<Boolean> auth(@NotNull String userName, @NotNull byte[] password);
9+
10+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.ss.mqtt.broker.service;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import reactor.core.publisher.Mono;
5+
6+
public interface CredentialSource {
7+
8+
@NotNull Mono<Boolean> check(@NotNull String user, @NotNull byte[] pass);
9+
10+
@NotNull Mono<Boolean> check(@NotNull byte[] pass);
11+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.ss.mqtt.broker.service.impl;
2+
3+
import com.ss.mqtt.broker.service.CredentialSource;
4+
import org.jetbrains.annotations.NotNull;
5+
import reactor.core.publisher.Mono;
6+
7+
import java.nio.charset.StandardCharsets;
8+
import java.util.Arrays;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
12+
public abstract class AbstractCredentialSource implements CredentialSource {
13+
14+
private final Map<String, byte[]> credentials = new HashMap<>();
15+
16+
abstract void init();
17+
18+
void putCredentials(@NotNull Object user, @NotNull Object pass) {
19+
credentials.put(user.toString(), pass.toString().getBytes(StandardCharsets.UTF_8));
20+
}
21+
22+
@Override
23+
public @NotNull Mono<Boolean> check(@NotNull String user, @NotNull byte[] pass) {
24+
return Mono.just(Arrays.equals(pass, credentials.get(user)));
25+
}
26+
27+
@Override
28+
public @NotNull Mono<Boolean> check(@NotNull byte[] pass) {
29+
return Mono.just(Boolean.FALSE);
30+
}
31+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.ss.mqtt.broker.service.impl;
2+
3+
import com.ss.mqtt.broker.exception.CredentialsSourceException;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.io.FileInputStream;
7+
import java.io.IOException;
8+
import java.util.Properties;
9+
10+
public class FileCredentialsSource extends AbstractCredentialSource {
11+
12+
private final String fileName;
13+
14+
public FileCredentialsSource(@NotNull String fileName) {
15+
this.fileName = fileName;
16+
init();
17+
}
18+
19+
@Override
20+
void init() {
21+
var credentialUrl = FileCredentialsSource.class.getClassLoader().getResource(fileName);
22+
if (credentialUrl == null) {
23+
throw new CredentialsSourceException("Credentials file could not be found");
24+
}
25+
try {
26+
var credentialsProperties = new Properties();
27+
credentialsProperties.load(new FileInputStream(credentialUrl.getPath()));
28+
credentialsProperties.forEach(this::putCredentials);
29+
} catch (IOException e) {
30+
throw new CredentialsSourceException(e);
31+
}
32+
}
33+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.ss.mqtt.broker.service.impl;
2+
3+
import com.ss.mqtt.broker.service.AuthenticationService;
4+
import com.ss.mqtt.broker.service.CredentialSource;
5+
import com.ss.rlib.common.util.StringUtils;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
import reactor.core.publisher.Mono;
9+
10+
@RequiredArgsConstructor
11+
public class SimpleAuthenticationService implements AuthenticationService {
12+
13+
private final @NotNull CredentialSource credentialSource;
14+
private final boolean allowAnonymousAuth;
15+
16+
@Override
17+
public @NotNull Mono<Boolean> auth(@NotNull String userName, @NotNull byte[] password) {
18+
if (allowAnonymousAuth && userName.equals(StringUtils.EMPTY)) {
19+
return Mono.just(Boolean.TRUE);
20+
} else {
21+
return credentialSource.check(userName, password);
22+
}
23+
}
24+
25+
}

0 commit comments

Comments
 (0)