Skip to content

Commit 4507cb0

Browse files
authored
Merge pull request #4 from JavaSaBr/broker-3
implement PublishOutPacket and SimplePublishingService
2 parents 98a7549 + 37bab29 commit 4507cb0

20 files changed

+610
-88
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
77
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
88
import com.ss.mqtt.broker.service.ClientService;
9+
import com.ss.mqtt.broker.service.PublishingService;
910
import com.ss.mqtt.broker.service.SubscriptionService;
1011
import com.ss.mqtt.broker.service.impl.DefaultClientService;
12+
import com.ss.mqtt.broker.service.impl.SimplePublishingService;
1113
import com.ss.mqtt.broker.service.impl.SimpleSubscriptionService;
1214
import com.ss.mqtt.broker.service.impl.SimpleSubscriptions;
1315
import com.ss.rlib.network.*;
@@ -56,14 +58,16 @@ private interface ChannelFactory extends
5658
@NotNull BufferAllocator bufferAllocator,
5759
@NotNull Consumer<MqttConnection> mqttConnectionConsumer,
5860
@NotNull MqttConnectionConfig connectionConfig,
59-
@NotNull SubscriptionService subscriptionService
61+
@NotNull SubscriptionService subscriptionService,
62+
@NotNull PublishingService publishingService
6063
) {
6164
ServerNetwork<MqttConnection> serverNetwork = NetworkFactory.newServerNetwork(
6265
networkConfig,
6366
networkChannelFactory(
6467
bufferAllocator,
6568
connectionConfig,
66-
subscriptionService
69+
subscriptionService,
70+
publishingService
6771
)
6872
);
6973

@@ -78,6 +82,11 @@ private interface ChannelFactory extends
7882
return new SimpleSubscriptionService(new SimpleSubscriptions());
7983
}
8084

85+
@Bean
86+
@NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) {
87+
return new SimplePublishingService(subscriptionService);
88+
}
89+
8190
@Bean
8291
@NotNull Consumer<MqttConnection> mqttConnectionConsumer(@NotNull ClientService clientService) {
8392
return mqttConnection -> {
@@ -122,7 +131,8 @@ private interface ChannelFactory extends
122131
private @NotNull ChannelFactory networkChannelFactory(
123132
@NotNull BufferAllocator bufferAllocator,
124133
@NotNull MqttConnectionConfig connectionConfig,
125-
@NotNull SubscriptionService subscriptionService
134+
@NotNull SubscriptionService subscriptionService,
135+
@NotNull PublishingService publishingService
126136
) {
127137
return (network, channel) -> new MqttConnection(
128138
network,
@@ -131,6 +141,7 @@ private interface ChannelFactory extends
131141
bufferAllocator,
132142
100,
133143
subscriptionService,
144+
publishingService,
134145
connectionConfig
135146
);
136147
}

src/main/java/com/ss/mqtt/broker/network/MqttClient.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import com.ss.mqtt.broker.exception.ConnectionRejectException;
44
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
55
import com.ss.mqtt.broker.model.MqttPropertyConstants;
6+
import com.ss.mqtt.broker.model.PublishAckReasonCode;
67
import com.ss.mqtt.broker.network.packet.PacketType;
78
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
89
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
910
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
1011
import com.ss.mqtt.broker.network.packet.in.SubscribeInPacket;
1112
import com.ss.mqtt.broker.network.packet.in.UnsubscribeInPacket;
1213
import lombok.EqualsAndHashCode;
14+
import com.ss.mqtt.broker.network.packet.in.*;
1315
import lombok.Getter;
1416
import lombok.extern.log4j.Log4j2;
1517
import org.jetbrains.annotations.NotNull;
@@ -46,6 +48,9 @@ public void handle(@NotNull MqttReadablePacket packet) {
4648
case UNSUBSCRIBE:
4749
onUnsubscribe((UnsubscribeInPacket) packet);
4850
break;
51+
case PUBLISH:
52+
onPublish((PublishInPacket) packet);
53+
break;
4954
}
5055
}
5156

@@ -90,7 +95,8 @@ private void onSubscribe(@NotNull SubscribeInPacket subscribe) {
9095
var ackReasonCodes = connection.getSubscriptionService()
9196
.subscribe(connection.getClient(), subscribe.getTopicFilters());
9297

93-
connection.send(getPacketOutFactory().newSubscribeAck(connection.getClient(),
98+
connection.send(getPacketOutFactory().newSubscribeAck(
99+
connection.getClient(),
94100
subscribe.getPacketId(),
95101
ackReasonCodes
96102
));
@@ -101,13 +107,23 @@ private void onUnsubscribe(@NotNull UnsubscribeInPacket subscribe) {
101107
var ackReasonCodes = connection.getSubscriptionService()
102108
.unsubscribe(connection.getClient(), subscribe.getTopicFilters());
103109

104-
connection.send(getPacketOutFactory().newUnsubscribeAck(connection.getClient(),
110+
connection.send(getPacketOutFactory().newUnsubscribeAck(
111+
connection.getClient(),
105112
subscribe.getPacketId(),
106113
ackReasonCodes
107114
));
108115
}
109116

110-
private @NotNull MqttPacketOutFactory getPacketOutFactory() {
117+
public void onPublish(@NotNull PublishInPacket publish) {
118+
var ackReasonCode = connection.getPublishingService().publish(publish);
119+
connection.send(getPacketOutFactory().newPublishAck(
120+
connection.getClient(),
121+
publish.getPacketId(),
122+
ackReasonCode
123+
));
124+
}
125+
126+
public @NotNull MqttPacketOutFactory getPacketOutFactory() {
111127
return connection.getMqttVersion().getPacketOutFactory();
112128
}
113129
}

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.ss.mqtt.broker.network.packet.MqttPacketWriter;
77
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
88
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
9+
import com.ss.mqtt.broker.service.PublishingService;
910
import com.ss.mqtt.broker.service.SubscriptionService;
1011
import com.ss.rlib.network.BufferAllocator;
1112
import com.ss.rlib.network.Connection;
@@ -30,6 +31,7 @@ public class MqttConnection extends AbstractConnection<MqttReadablePacket, MqttW
3031
private final PacketWriter packetWriter;
3132

3233
private final SubscriptionService subscriptionService;
34+
private final PublishingService publishingService;
3335

3436
private final @Getter @NotNull MqttClient client;
3537
private final @Getter @NotNull MqttConnectionConfig config;
@@ -43,6 +45,7 @@ public MqttConnection(
4345
@NotNull BufferAllocator bufferAllocator,
4446
int maxPacketsByRead,
4547
@NotNull SubscriptionService subscriptionService,
48+
@NotNull PublishingService publishingService,
4649
@NotNull MqttConnectionConfig config
4750
) {
4851
super(network, channel, crypt, bufferAllocator, maxPacketsByRead);
@@ -52,6 +55,7 @@ public MqttConnection(
5255
this.packetWriter = createPacketWriter();
5356
this.client = new MqttClient(this);
5457
this.subscriptionService = subscriptionService;
58+
this.publishingService = publishingService;
5559
}
5660

5761
public boolean isSupported(@NotNull MqttVersion mqttVersion) {

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt311PacketOutFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
2626
return new ConnectAck311OutPacket(client, reasonCode, sessionPresent);
2727
}
2828

29+
@Override
30+
public @NotNull MqttWritablePacket newPublish(
31+
@NotNull MqttClient client,
32+
int packetId,
33+
@NotNull QoS qos,
34+
boolean retained,
35+
boolean duplicate,
36+
@NotNull String topicName,
37+
int topicAlias,
38+
@NotNull byte[] payload,
39+
boolean stringPayload,
40+
@NotNull String responseTopic,
41+
@NotNull byte[] correlationData,
42+
@NotNull Array<StringPair> userProperties
43+
) {
44+
return newPublish(client, packetId, qos, retained, duplicate, topicName, payload);
45+
}
46+
2947
@Override
3048
public @NotNull MqttWritablePacket newPublishAck(
3149
@NotNull MqttClient client,

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt5PacketOutFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,37 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
3939
);
4040
}
4141

42+
@Override
43+
public @NotNull MqttWritablePacket newPublish(
44+
@NotNull MqttClient client,
45+
int packetId,
46+
@NotNull QoS qos,
47+
boolean retained,
48+
boolean duplicate,
49+
@NotNull String topicName,
50+
int topicAlias,
51+
@NotNull byte[] payload,
52+
boolean stringPayload,
53+
@NotNull String responseTopic,
54+
@NotNull byte[] correlationData,
55+
@NotNull Array<StringPair> userProperties
56+
) {
57+
return new Publish5OutPacket(
58+
client,
59+
packetId,
60+
qos,
61+
retained,
62+
duplicate,
63+
topicName,
64+
topicAlias,
65+
payload,
66+
stringPayload,
67+
responseTopic,
68+
correlationData,
69+
userProperties
70+
);
71+
}
72+
4273
@Override
4374
public @NotNull MqttWritablePacket newPublishAck(
4475
@NotNull MqttClient client,

src/main/java/com/ss/mqtt/broker/network/packet/factory/MqttPacketOutFactory.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,46 @@ public abstract class MqttPacketOutFactory {
7070
}
7171

7272

73+
public @NotNull MqttWritablePacket newPublish(
74+
@NotNull MqttClient client,
75+
int packetId,
76+
@NotNull QoS qos,
77+
boolean retained,
78+
boolean duplicate,
79+
@NotNull String topicName,
80+
@NotNull byte[] payload
81+
) {
82+
return newPublish(
83+
client,
84+
packetId,
85+
qos,
86+
retained,
87+
duplicate,
88+
topicName,
89+
0,
90+
payload,
91+
false,
92+
StringUtils.EMPTY,
93+
ArrayUtils.EMPTY_BYTE_ARRAY,
94+
Array.empty()
95+
);
96+
}
97+
98+
public abstract @NotNull MqttWritablePacket newPublish(
99+
@NotNull MqttClient client,
100+
int packetId,
101+
@NotNull QoS qos,
102+
boolean retained,
103+
boolean duplicate,
104+
@NotNull String topicName,
105+
int topicAlias,
106+
@NotNull byte[] payload,
107+
boolean stringPayload,
108+
@NotNull String responseTopic,
109+
@NotNull byte[] correlationData,
110+
@NotNull Array<StringPair> userProperties
111+
);
112+
73113
public abstract @NotNull MqttWritablePacket newPublishAck(
74114
@NotNull MqttClient client,
75115
int packetId,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.ss.mqtt.broker.network.packet.out;
2+
3+
import com.ss.mqtt.broker.model.QoS;
4+
import com.ss.mqtt.broker.network.MqttClient;
5+
import com.ss.mqtt.broker.network.packet.PacketType;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
import java.nio.ByteBuffer;
9+
10+
public class Publish311OutPacket extends MqttWritablePacket {
11+
12+
private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH.ordinal();
13+
14+
private final int packetId;
15+
private final boolean retained;
16+
private final boolean duplicate;
17+
private final @NotNull QoS qos;
18+
private final @NotNull byte[] payload;
19+
private final @NotNull String topicName;
20+
21+
public Publish311OutPacket(
22+
@NotNull MqttClient client,
23+
int packetId,
24+
@NotNull QoS qos,
25+
boolean retained,
26+
boolean duplicate,
27+
@NotNull String topicName,
28+
@NotNull byte[] payload
29+
) {
30+
super(client);
31+
this.qos = qos;
32+
this.retained = retained;
33+
this.duplicate = duplicate;
34+
this.packetId = packetId;
35+
this.payload = payload;
36+
this.topicName = topicName;
37+
}
38+
39+
@Override
40+
public int getExpectedLength() {
41+
return 7 + payload.length;
42+
}
43+
44+
@Override
45+
protected byte getPacketFlags() {
46+
47+
byte info = (byte) (qos.ordinal() << 1);
48+
49+
if (retained) {
50+
info |= 0x01;
51+
}
52+
53+
if (duplicate) {
54+
info |= 0x08;
55+
}
56+
57+
return info;
58+
}
59+
60+
@Override
61+
protected byte getPacketType() {
62+
return PACKET_TYPE;
63+
}
64+
65+
@Override
66+
protected void writeVariableHeader(@NotNull ByteBuffer buffer) {
67+
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800412
68+
writeString(buffer, topicName);
69+
writeShort(buffer, packetId);
70+
}
71+
72+
@Override
73+
protected void writePayload(@NotNull ByteBuffer buffer) {
74+
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800413
75+
writeBytes(buffer, payload);
76+
}
77+
}

0 commit comments

Comments
 (0)