Skip to content

Commit 5347f06

Browse files
authored
Merge pull request #6 from JavaSaBr/feature-broker-5
Implement ClientIdRegistry
2 parents 4507cb0 + 812820e commit 5347f06

File tree

61 files changed

+906
-318
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+906
-318
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ allprojects {
4646
}
4747

4848
dependencies {
49+
4950
implementation "com.spaceshift:rlib.network:$rlibVersion"
5051
implementation "com.spaceshift:rlib.logger.slf4j:$rlibVersion"
5152
implementation "org.springframework.boot:spring-boot-starter:$springbootVersion"
5253
implementation "org.springframework.boot:spring-boot-starter-logging:$springbootVersion"
54+
implementation "io.projectreactor:reactor-core:$projectReactorVersion"
5355

5456
compileOnly "org.jetbrains:annotations:$annotationVersion"
5557
compileOnly "org.projectlombok:lombok:$lombokVersion"

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
import com.ss.mqtt.broker.model.MqttPropertyConstants;
44
import com.ss.mqtt.broker.model.QoS;
55
import com.ss.mqtt.broker.network.MqttConnection;
6-
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
7-
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
6+
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
7+
import com.ss.mqtt.broker.network.client.impl.DeviceMqttClient;
8+
import com.ss.mqtt.broker.network.packet.PacketType;
9+
import com.ss.mqtt.broker.network.packet.in.handler.*;
10+
import com.ss.mqtt.broker.service.ClientIdRegistry;
811
import com.ss.mqtt.broker.service.ClientService;
912
import com.ss.mqtt.broker.service.PublishingService;
1013
import com.ss.mqtt.broker.service.SubscriptionService;
11-
import com.ss.mqtt.broker.service.impl.DefaultClientService;
12-
import com.ss.mqtt.broker.service.impl.SimplePublishingService;
13-
import com.ss.mqtt.broker.service.impl.SimpleSubscriptionService;
14-
import com.ss.mqtt.broker.service.impl.SimpleSubscriptions;
14+
import com.ss.mqtt.broker.service.impl.*;
1515
import com.ss.rlib.network.*;
1616
import com.ss.rlib.network.impl.DefaultBufferAllocator;
1717
import com.ss.rlib.network.server.ServerNetwork;
@@ -52,29 +52,62 @@ private interface ChannelFactory extends
5252
return new DefaultClientService();
5353
}
5454

55+
@NotNull
56+
@Bean ClientIdRegistry clientIdRegistry() {
57+
return new SimpleClientIdRegistry(
58+
env.getProperty(
59+
"client.id.available.chars",
60+
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-"
61+
),
62+
env.getProperty("client.id.max.length", int.class, 36)
63+
);
64+
}
65+
5566
@Bean
56-
@NotNull Network<? extends Connection<MqttReadablePacket, MqttWritablePacket>> network(
57-
@NotNull ServerNetworkConfig networkConfig,
58-
@NotNull BufferAllocator bufferAllocator,
59-
@NotNull Consumer<MqttConnection> mqttConnectionConsumer,
60-
@NotNull MqttConnectionConfig connectionConfig,
67+
PacketInHandler @NotNull [] devicePacketHandlers(
68+
@NotNull ClientIdRegistry clientIdRegistry,
6169
@NotNull SubscriptionService subscriptionService,
6270
@NotNull PublishingService publishingService
6371
) {
64-
ServerNetwork<MqttConnection> serverNetwork = NetworkFactory.newServerNetwork(
72+
73+
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
74+
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(clientIdRegistry);
75+
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
76+
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
77+
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
78+
79+
return handlers;
80+
}
81+
82+
@Bean
83+
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork(
84+
@NotNull ServerNetworkConfig networkConfig,
85+
@NotNull BufferAllocator bufferAllocator,
86+
@NotNull MqttConnectionConfig deviceConnectionConfig,
87+
PacketInHandler @NotNull [] devicePacketHandlers
88+
) {
89+
return NetworkFactory.newServerNetwork(
6590
networkConfig,
66-
networkChannelFactory(
91+
deviceConnectionFactory(
6792
bufferAllocator,
68-
connectionConfig,
69-
subscriptionService,
70-
publishingService
93+
deviceConnectionConfig,
94+
devicePacketHandlers
7195
)
7296
);
97+
}
7398

74-
serverNetwork.start(new InetSocketAddress("localhost", 1883));
75-
serverNetwork.onAccept(mqttConnectionConsumer);
99+
@Bean
100+
@NotNull InetSocketAddress deviceNetworkAddress(
101+
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,
102+
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer
103+
) {
104+
105+
var address = new InetSocketAddress("localhost", 1883);
106+
107+
deviceNetwork.start(address);
108+
deviceNetwork.onAccept(mqttConnectionConsumer);
76109

77-
return serverNetwork;
110+
return address;
78111
}
79112

80113
@Bean
@@ -88,16 +121,16 @@ private interface ChannelFactory extends
88121
}
89122

90123
@Bean
91-
@NotNull Consumer<MqttConnection> mqttConnectionConsumer(@NotNull ClientService clientService) {
124+
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer() {
92125
return mqttConnection -> {
93126
log.info("Accepted connection: {}", mqttConnection);
94-
var client = mqttConnection.getClient();
127+
var client = (UnsafeMqttClient) mqttConnection.getClient();
95128
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
96129
};
97130
}
98131

99132
@Bean
100-
@NotNull MqttConnectionConfig mqttConnectionConfig() {
133+
@NotNull MqttConnectionConfig deviceConnectionConfig() {
101134
return new MqttConnectionConfig(
102135
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
103136
env.getProperty(
@@ -128,21 +161,19 @@ private interface ChannelFactory extends
128161
);
129162
}
130163

131-
private @NotNull ChannelFactory networkChannelFactory(
164+
private @NotNull ChannelFactory deviceConnectionFactory(
132165
@NotNull BufferAllocator bufferAllocator,
133166
@NotNull MqttConnectionConfig connectionConfig,
134-
@NotNull SubscriptionService subscriptionService,
135-
@NotNull PublishingService publishingService
167+
PacketInHandler @NotNull [] packetHandlers
136168
) {
137169
return (network, channel) -> new MqttConnection(
138170
network,
139171
channel,
140-
NetworkCryptor.NULL,
141172
bufferAllocator,
142173
100,
143-
subscriptionService,
144-
publishingService,
145-
connectionConfig
174+
packetHandlers,
175+
connectionConfig,
176+
DeviceMqttClient::new
146177
);
147178
}
148179
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.ss.mqtt.broker.exception;
2+
3+
public class MalformedPacketMqttException extends MqttException {}

src/main/java/com/ss/mqtt/broker/network/MqttClient.java

Lines changed: 0 additions & 129 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import com.ss.mqtt.broker.config.MqttConnectionConfig;
44
import com.ss.mqtt.broker.model.MqttVersion;
5+
import com.ss.mqtt.broker.network.client.MqttClient;
6+
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
57
import com.ss.mqtt.broker.network.packet.MqttPacketReader;
68
import com.ss.mqtt.broker.network.packet.MqttPacketWriter;
79
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
10+
import com.ss.mqtt.broker.network.packet.in.handler.PacketInHandler;
811
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
9-
import com.ss.mqtt.broker.service.PublishingService;
10-
import com.ss.mqtt.broker.service.SubscriptionService;
1112
import com.ss.rlib.network.BufferAllocator;
1213
import com.ss.rlib.network.Connection;
1314
import com.ss.rlib.network.Network;
@@ -22,40 +23,40 @@
2223
import org.jetbrains.annotations.NotNull;
2324

2425
import java.nio.channels.AsynchronousSocketChannel;
26+
import java.util.function.Function;
2527

2628
@Log4j2
27-
@Getter(AccessLevel.PROTECTED)
2829
public class MqttConnection extends AbstractConnection<MqttReadablePacket, MqttWritablePacket> {
2930

30-
private final PacketReader packetReader;
31-
private final PacketWriter packetWriter;
31+
@Getter(AccessLevel.PROTECTED)
32+
private final @NotNull PacketReader packetReader;
3233

33-
private final SubscriptionService subscriptionService;
34-
private final PublishingService publishingService;
34+
@Getter(AccessLevel.PROTECTED)
35+
private final @NotNull PacketWriter packetWriter;
36+
37+
private final @Getter PacketInHandler @NotNull [] packetHandlers;
3538

3639
private final @Getter @NotNull MqttClient client;
3740
private final @Getter @NotNull MqttConnectionConfig config;
3841

39-
private volatile @Setter @NotNull MqttVersion mqttVersion;
42+
private volatile @Getter @Setter @NotNull MqttVersion mqttVersion;
4043

4144
public MqttConnection(
4245
@NotNull Network<? extends Connection<MqttReadablePacket, MqttWritablePacket>> network,
4346
@NotNull AsynchronousSocketChannel channel,
44-
@NotNull NetworkCryptor crypt,
4547
@NotNull BufferAllocator bufferAllocator,
4648
int maxPacketsByRead,
47-
@NotNull SubscriptionService subscriptionService,
48-
@NotNull PublishingService publishingService,
49-
@NotNull MqttConnectionConfig config
49+
PacketInHandler @NotNull [] packetHandlers,
50+
@NotNull MqttConnectionConfig config,
51+
@NotNull Function<MqttConnection, UnsafeMqttClient> clientFactory
5052
) {
51-
super(network, channel, crypt, bufferAllocator, maxPacketsByRead);
53+
super(network, channel, NetworkCryptor.NULL, bufferAllocator, maxPacketsByRead);
54+
this.packetHandlers = packetHandlers;
5255
this.config = config;
5356
this.mqttVersion = MqttVersion.MQTT_3_1_1;
5457
this.packetReader = createPacketReader();
5558
this.packetWriter = createPacketWriter();
56-
this.client = new MqttClient(this);
57-
this.subscriptionService = subscriptionService;
58-
this.publishingService = publishingService;
59+
this.client = clientFactory.apply(this);
5960
}
6061

6162
public boolean isSupported(@NotNull MqttVersion mqttVersion) {
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.ss.mqtt.broker.network.client;
2+
3+
import com.ss.mqtt.broker.config.MqttConnectionConfig;
4+
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
5+
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
public interface MqttClient {
9+
10+
@NotNull MqttPacketOutFactory getPacketOutFactory();
11+
@NotNull MqttConnectionConfig getConnectionConfig();
12+
@NotNull String getClientId();
13+
14+
int getKeepAlive();
15+
int getMaximumPacketSize();
16+
int getReceiveMax();
17+
int getTopicAliasMaximum();
18+
19+
long getSessionExpiryInterval();
20+
21+
void send(@NotNull MqttWritablePacket packet);
22+
}

0 commit comments

Comments
 (0)