Skip to content

Commit 98a7549

Browse files
committed
add mechanism to handle exceptions on packet level
1 parent eadbf36 commit 98a7549

File tree

7 files changed

+84
-11
lines changed

7 files changed

+84
-11
lines changed
Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
package com.ss.mqtt.broker.exception;
22

33
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
4-
import lombok.RequiredArgsConstructor;
4+
import lombok.Getter;
55
import org.jetbrains.annotations.NotNull;
66

7-
@RequiredArgsConstructor
8-
public class ConnectionRejectException extends RuntimeException {
7+
public class ConnectionRejectException extends MqttException {
98

10-
private final @NotNull ConnectAckReasonCode reasonCode;
9+
private final @Getter @NotNull ConnectAckReasonCode reasonCode;
10+
11+
public ConnectionRejectException(@NotNull ConnectAckReasonCode reasonCode) {
12+
this.reasonCode = reasonCode;
13+
}
14+
15+
public ConnectionRejectException(@NotNull Throwable cause, @NotNull ConnectAckReasonCode reasonCode) {
16+
super(cause);
17+
this.reasonCode = reasonCode;
18+
}
1119
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
public class MqttException extends RuntimeException {
6+
7+
public MqttException() {
8+
}
9+
10+
public MqttException(@NotNull String message) {
11+
super(message);
12+
}
13+
14+
public MqttException(@NotNull Throwable cause) {
15+
super(cause);
16+
}
17+
}

src/main/java/com/ss/mqtt/broker/network/MqttClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ss.mqtt.broker.network;
22

3+
import com.ss.mqtt.broker.exception.ConnectionRejectException;
34
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
45
import com.ss.mqtt.broker.model.MqttPropertyConstants;
56
import com.ss.mqtt.broker.network.packet.PacketType;
@@ -55,6 +56,19 @@ public void reject(@NotNull ConnectAckReasonCode returnCode) {
5556
private void onConnected(@NotNull ConnectInPacket connect) {
5657
connection.setMqttVersion(connect.getMqttVersion());
5758

59+
var exception = connect.getException();
60+
61+
if (exception instanceof ConnectionRejectException) {
62+
63+
var reasonCode = ((ConnectionRejectException) exception).getReasonCode();
64+
65+
connection
66+
.sendWithFeedback(getPacketOutFactory().newConnectAck(this, reasonCode))
67+
.thenAccept(sent -> connection.close());
68+
69+
return;
70+
}
71+
5872
sessionExpiryInterval = connect.getSessionExpiryInterval();
5973
receiveMax = connect.getReceiveMax();
6074
maximumPacketSize = connect.getMaximumPacketSize();

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.ss.mqtt.broker.config.MqttConnectionConfig;
44
import com.ss.mqtt.broker.model.MqttVersion;
5-
import com.ss.mqtt.broker.model.QoS;
65
import com.ss.mqtt.broker.network.packet.MqttPacketReader;
76
import com.ss.mqtt.broker.network.packet.MqttPacketWriter;
87
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
@@ -76,7 +75,9 @@ public boolean isSupported(@NotNull MqttVersion mqttVersion) {
7675
channel,
7776
bufferAllocator,
7877
this::updateLastActivity,
79-
this::nextPacketToWrite
78+
this::nextPacketToWrite,
79+
this::onWrittenPacket,
80+
this::onSentPacket
8081
);
8182
}
8283

src/main/java/com/ss/mqtt/broker/network/packet/MqttPacketWriter.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
55
import com.ss.mqtt.broker.util.MqttDataUtils;
66
import com.ss.rlib.network.BufferAllocator;
7+
import com.ss.rlib.network.packet.WritablePacket;
78
import com.ss.rlib.network.packet.impl.AbstractPacketWriter;
89
import org.jetbrains.annotations.NotNull;
910

1011
import java.nio.ByteBuffer;
1112
import java.nio.channels.AsynchronousSocketChannel;
13+
import java.util.function.BiConsumer;
14+
import java.util.function.Consumer;
1215
import java.util.function.Supplier;
1316

1417
public class MqttPacketWriter extends AbstractPacketWriter<MqttWritablePacket, MqttConnection> {
@@ -18,13 +21,23 @@ public MqttPacketWriter(
1821
@NotNull AsynchronousSocketChannel channel,
1922
@NotNull BufferAllocator bufferAllocator,
2023
@NotNull Runnable updateActivityFunction,
21-
@NotNull Supplier<MqttWritablePacket> nextWritePacketSupplier
24+
@NotNull Supplier<@NotNull WritablePacket> nextWritePacketSupplier,
25+
@NotNull Consumer<@NotNull WritablePacket> writtenPacketHandler,
26+
@NotNull BiConsumer<@NotNull WritablePacket, Boolean> sentPacketHandler
2227
) {
23-
super(connection, channel, bufferAllocator, updateActivityFunction, nextWritePacketSupplier);
28+
super(
29+
connection,
30+
channel,
31+
bufferAllocator,
32+
updateActivityFunction,
33+
nextWritePacketSupplier,
34+
writtenPacketHandler,
35+
sentPacketHandler
36+
);
2437
}
2538

2639
@Override
27-
protected int getTotalSize(@NotNull MqttWritablePacket packet, int expectedLength) {
40+
protected int getTotalSize(@NotNull WritablePacket packet, int expectedLength) {
2841
return 1 + MqttDataUtils.sizeOfMbi(expectedLength) + expectedLength;
2942
}
3043

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public class ConnectInPacket extends MqttReadablePacket {
219219
public ConnectInPacket(byte info) {
220220
super(info);
221221
this.userProperties = Array.empty();
222-
this.mqttVersion = MqttVersion.MQTT_5;
222+
this.mqttVersion = MqttVersion.MQTT_3_1_1;
223223
this.clientId = StringUtils.EMPTY;
224224
this.willTopic = StringUtils.EMPTY;
225225
this.username = StringUtils.EMPTY;
@@ -270,7 +270,7 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
270270
the reserved flag is not 0 it is a Malformed Packet. Refer to section 4.13 for information about handling
271271
errors.
272272
*/
273-
throw new IllegalStateException("non-zero reserved flag");
273+
throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
274274
}
275275
}
276276

src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

3+
import com.ss.mqtt.broker.exception.ConnectionRejectException;
4+
import com.ss.mqtt.broker.exception.MqttException;
5+
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
36
import com.ss.mqtt.broker.model.MqttVersion;
47
import com.ss.mqtt.broker.model.PacketProperty;
58
import com.ss.mqtt.broker.model.StringPair;
@@ -11,6 +14,7 @@
1114
import com.ss.rlib.network.packet.impl.AbstractReadablePacket;
1215
import lombok.Getter;
1316
import org.jetbrains.annotations.NotNull;
17+
import org.jetbrains.annotations.Nullable;
1418

1519
import java.nio.ByteBuffer;
1620
import java.nio.charset.StandardCharsets;
@@ -24,6 +28,11 @@ public abstract class MqttReadablePacket extends AbstractReadablePacket<MqttConn
2428
*/
2529
protected @Getter @NotNull Array<StringPair> userProperties;
2630

31+
/**
32+
* The happened exception during parsing this packet.
33+
*/
34+
protected @Getter @Nullable Exception exception;
35+
2736
protected MqttReadablePacket(byte info) {
2837
this.userProperties = Array.empty();
2938
}
@@ -41,6 +50,17 @@ protected void readImpl(@NotNull MqttConnection connection, @NotNull ByteBuffer
4150
readPayload(connection, buffer);
4251
}
4352

53+
@Override
54+
protected void handleException(@NotNull ByteBuffer buffer, @NotNull Exception exception) {
55+
super.handleException(buffer, exception);
56+
57+
if (!(exception instanceof MqttException)) {
58+
exception = new ConnectionRejectException(exception, ConnectAckReasonCode.PROTOCOL_ERROR);
59+
}
60+
61+
this.exception = exception;
62+
}
63+
4464
protected boolean isPropertiesSupported(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
4565
return connection.isSupported(MqttVersion.MQTT_5);
4666
}

0 commit comments

Comments
 (0)