Skip to content

Commit 94ecae6

Browse files
committed
[broker-5] add validating UTF-8 string during packet reading
1 parent 5b237ef commit 94ecae6

File tree

11 files changed

+74
-47
lines changed

11 files changed

+74
-47
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.ss.mqtt.broker.service.SubscriptionService;
1414
import com.ss.mqtt.broker.service.impl.*;
1515
import com.ss.rlib.network.*;
16-
import com.ss.mqtt.broker.service.impl.SimplePublishingService;
1716
import com.ss.rlib.network.impl.DefaultBufferAllocator;
1817
import com.ss.rlib.network.server.ServerNetwork;
1918
import lombok.RequiredArgsConstructor;
@@ -81,28 +80,26 @@ private interface ChannelFactory extends
8180
}
8281

8382
@Bean
84-
@NotNull ServerNetwork<MqttConnection> deviceNetwork(
83+
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork(
8584
@NotNull ServerNetworkConfig networkConfig,
8685
@NotNull BufferAllocator bufferAllocator,
8786
@NotNull MqttConnectionConfig deviceConnectionConfig,
88-
@NotNull SubscriptionService subscriptionService,
8987
PacketInHandler @NotNull [] devicePacketHandlers
9088
) {
9189
return NetworkFactory.newServerNetwork(
9290
networkConfig,
9391
deviceConnectionFactory(
9492
bufferAllocator,
9593
deviceConnectionConfig,
96-
subscriptionService,
9794
devicePacketHandlers
9895
)
9996
);
10097
}
10198

10299
@Bean
103100
@NotNull InetSocketAddress deviceNetworkAddress(
104-
@NotNull ServerNetwork<MqttConnection> deviceNetwork,
105-
@NotNull Consumer<MqttConnection> mqttConnectionConsumer
101+
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,
102+
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer
106103
) {
107104

108105
var address = new InetSocketAddress("localhost", 1883);
@@ -124,7 +121,7 @@ private interface ChannelFactory extends
124121
}
125122

126123
@Bean
127-
@NotNull Consumer<MqttConnection> mqttConnectionConsumer() {
124+
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer() {
128125
return mqttConnection -> {
129126
log.info("Accepted connection: {}", mqttConnection);
130127
var client = (UnsafeMqttClient) mqttConnection.getClient();
@@ -167,7 +164,6 @@ private interface ChannelFactory extends
167164
private @NotNull ChannelFactory deviceConnectionFactory(
168165
@NotNull BufferAllocator bufferAllocator,
169166
@NotNull MqttConnectionConfig connectionConfig,
170-
@NotNull SubscriptionService subscriptionService,
171167
PacketInHandler @NotNull [] packetHandlers
172168
) {
173169
return (network, channel) -> new MqttConnection(
@@ -176,7 +172,6 @@ private interface ChannelFactory extends
176172
bufferAllocator,
177173
100,
178174
packetHandlers,
179-
subscriptionService,
180175
connectionConfig,
181176
DeviceMqttClient::new
182177
);

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
1010
import com.ss.mqtt.broker.network.packet.in.handler.PacketInHandler;
1111
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
12-
import com.ss.mqtt.broker.service.SubscriptionService;
1312
import com.ss.rlib.network.BufferAllocator;
1413
import com.ss.rlib.network.Connection;
1514
import com.ss.rlib.network.Network;
1615
import com.ss.rlib.network.NetworkCryptor;
1716
import com.ss.rlib.network.impl.AbstractConnection;
1817
import com.ss.rlib.network.packet.PacketReader;
1918
import com.ss.rlib.network.packet.PacketWriter;
19+
import lombok.AccessLevel;
2020
import lombok.Getter;
2121
import lombok.Setter;
2222
import lombok.extern.log4j.Log4j2;
@@ -26,27 +26,27 @@
2626
import java.util.function.Function;
2727

2828
@Log4j2
29-
@Getter
3029
public class MqttConnection extends AbstractConnection<MqttReadablePacket, MqttWritablePacket> {
3130

31+
@Getter(AccessLevel.PROTECTED)
3232
private final @NotNull PacketReader packetReader;
33+
34+
@Getter(AccessLevel.PROTECTED)
3335
private final @NotNull PacketWriter packetWriter;
3436

35-
private final PacketInHandler @NotNull [] packetHandlers;
36-
private final @NotNull SubscriptionService subscriptionService;
37+
private final @Getter PacketInHandler @NotNull [] packetHandlers;
3738

3839
private final @Getter @NotNull MqttClient client;
3940
private final @Getter @NotNull MqttConnectionConfig config;
4041

41-
private volatile @Setter @NotNull MqttVersion mqttVersion;
42+
private volatile @Getter @Setter @NotNull MqttVersion mqttVersion;
4243

4344
public MqttConnection(
4445
@NotNull Network<? extends Connection<MqttReadablePacket, MqttWritablePacket>> network,
4546
@NotNull AsynchronousSocketChannel channel,
4647
@NotNull BufferAllocator bufferAllocator,
4748
int maxPacketsByRead,
4849
PacketInHandler @NotNull [] packetHandlers,
49-
@NotNull SubscriptionService subscriptionService,
5050
@NotNull MqttConnectionConfig config,
5151
@NotNull Function<MqttConnection, UnsafeMqttClient> clientFactory
5252
) {
@@ -56,7 +56,6 @@ public MqttConnection(
5656
this.mqttVersion = MqttVersion.MQTT_3_1_1;
5757
this.packetReader = createPacketReader();
5858
this.packetWriter = createPacketWriter();
59-
this.subscriptionService = subscriptionService;
6059
this.client = clientFactory.apply(this);
6160
}
6261

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClient.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515
import lombok.extern.log4j.Log4j2;
1616
import org.jetbrains.annotations.NotNull;
1717

18+
@Getter
1819
@Log4j2
1920
@EqualsAndHashCode(of = "clientId")
2021
public abstract class AbstractMqttClient implements UnsafeMqttClient {
2122

22-
protected final @Getter@NotNull MqttConnection connection;
23+
protected final @NotNull MqttConnection connection;
2324

24-
private volatile @Setter @Getter @NotNull String clientId;
25+
private volatile @Setter @NotNull String clientId;
2526

26-
private volatile @Getter long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
27-
private volatile @Getter int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
28-
private volatile @Getter int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
29-
private volatile @Getter int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
30-
private volatile @Getter int keepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_MAX;
27+
private volatile long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
28+
private volatile int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
29+
private volatile int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
30+
private volatile int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
31+
private volatile int keepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_MAX;
3132

3233
public AbstractMqttClient(@NotNull MqttConnection connection) {
3334
this.connection = connection;

src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,38 @@
1313
import com.ss.rlib.common.util.array.ArrayFactory;
1414
import com.ss.rlib.network.packet.impl.AbstractReadablePacket;
1515
import lombok.Getter;
16+
import lombok.RequiredArgsConstructor;
1617
import org.jetbrains.annotations.NotNull;
1718
import org.jetbrains.annotations.Nullable;
1819

1920
import java.nio.ByteBuffer;
21+
import java.nio.CharBuffer;
22+
import java.nio.charset.CharsetDecoder;
23+
import java.nio.charset.CodingErrorAction;
2024
import java.nio.charset.StandardCharsets;
2125
import java.util.Collections;
2226
import java.util.Set;
2327

2428
public abstract class MqttReadablePacket extends AbstractReadablePacket<MqttConnection> {
2529

30+
@Getter
31+
@RequiredArgsConstructor
32+
private static class Utf8Decoder {
33+
private final CharsetDecoder decoder;
34+
private final ByteBuffer inBuffer;
35+
private final CharBuffer outBuffer;
36+
}
37+
38+
private static final ThreadLocal<Utf8Decoder> LOCAL_DECODER = ThreadLocal.withInitial(() -> {
39+
40+
var decoder = StandardCharsets.UTF_8.newDecoder()
41+
.onMalformedInput(CodingErrorAction.REPORT)
42+
.onUnmappableCharacter(CodingErrorAction.REPORT);
43+
44+
return new Utf8Decoder(decoder, ByteBuffer.allocate(10240), CharBuffer.allocate(10240));
45+
});
46+
47+
2648
/**
2749
* The list of user properties.
2850
*/
@@ -161,9 +183,30 @@ protected long readUnsignedInt(@NotNull ByteBuffer buffer) {
161183

162184
@Override
163185
protected @NotNull String readString(@NotNull ByteBuffer buffer) {
164-
var stringData = new byte[readShort(buffer) & 0xFFFF];
165-
buffer.get(stringData);
166-
return new String(stringData, StandardCharsets.UTF_8);
186+
187+
var utf8Decoder = LOCAL_DECODER.get();
188+
var inBuffer = utf8Decoder.getInBuffer();
189+
190+
var stringLength = readShort(buffer) & 0xFFFF;
191+
192+
if (stringLength > inBuffer.capacity()) {
193+
throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
194+
}
195+
196+
var decoder = utf8Decoder.getDecoder();
197+
var outBuffer = utf8Decoder.getOutBuffer();
198+
199+
buffer.get(inBuffer.clear().array(), 0, stringLength);
200+
201+
decoder.reset();
202+
203+
var result = decoder.decode(inBuffer.position(stringLength).flip(), outBuffer.clear(), true);
204+
205+
if (result.isError()) {
206+
throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
207+
}
208+
209+
return new String(inBuffer.array(), 0, stringLength, StandardCharsets.UTF_8);
167210
}
168211

169212
protected @NotNull byte[] readBytes(@NotNull ByteBuffer buffer) {

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.ss.rlib.common.util.StringUtils;
1010
import lombok.RequiredArgsConstructor;
1111
import org.jetbrains.annotations.NotNull;
12-
import reactor.core.publisher.Mono;
1312

1413
@RequiredArgsConstructor
1514
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {
@@ -38,12 +37,6 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
3837
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)))
3938
.subscribe();
4039
} else {
41-
42-
if (!clientIdRegistry.validate(requestedClientId)) {
43-
client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID);
44-
return;
45-
}
46-
4740
clientIdRegistry.register(requestedClientId)
4841
.subscribe(result -> {
4942
if(!result) {

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/PublishInPacketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
@RequiredArgsConstructor
1010
public class PublishInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishInPacket> {
1111

12-
private final PublishingService publishingService;
12+
private final @NotNull PublishingService publishingService;
1313

1414
@Override
1515
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishInPacket packet) {

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/SubscribeInPacketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
@RequiredArgsConstructor
1010
public class SubscribeInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, SubscribeInPacket> {
1111

12-
private final SubscriptionService subscriptionService;
12+
private final @NotNull SubscriptionService subscriptionService;
1313

1414
@Override
1515
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull SubscribeInPacket packet) {

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/UnsubscribeInPacketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
@RequiredArgsConstructor
1010
public class UnsubscribeInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, UnsubscribeInPacket> {
1111

12-
private final SubscriptionService subscriptionService;
12+
private final @NotNull SubscriptionService subscriptionService;
1313

1414
@Override
1515
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull UnsubscribeInPacket packet) {

src/main/java/com/ss/mqtt/broker/service/impl/SimpleClientIdRegistry.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,22 @@ public SimpleClientIdRegistry(@NotNull String availableChars, int maxClientIdLen
3232
@Override
3333
public @NotNull Mono<Boolean> register(@NotNull String clientId) {
3434

35-
var value = clientIdRegistry.getInReadLock(clientId, ObjectDictionary::get);
35+
if (!validate(clientId)) {
36+
return Mono.just(Boolean.FALSE);
37+
}
3638

39+
var value = clientIdRegistry.getInReadLock(clientId, ObjectDictionary::get);
3740
if (value != null) {
3841
return Mono.just(Boolean.FALSE);
3942
}
4043

4144
var stamp = clientIdRegistry.writeLock();
4245
try {
43-
4446
value = clientIdRegistry.get(clientId);
45-
4647
if (value != null) {
4748
return Mono.just(Boolean.FALSE);
4849
}
49-
5050
clientIdRegistry.put(clientId, CLIENT_ID_VALUE);
51-
5251
} finally {
5352
clientIdRegistry.writeUnlock(stamp);
5453
}

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptions.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,11 @@ public class SimpleSubscriptions implements Subscriptions {
4848
@NotNull String topicFilter,
4949
@NotNull MqttClient mqttClient
5050
) {
51-
5251
var subscribers = subscriptions.getOrDefault(topicFilter, Array.empty());
53-
5452
if (subscribers.removeIf(subscriber -> mqttClient.equals(subscriber.getMqttClient()))) {
5553
return UnsubscribeAckReasonCode.SUCCESS;
54+
} else {
55+
return UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
5656
}
57-
58-
return UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
5957
}
60-
6158
}

0 commit comments

Comments
 (0)