Skip to content

Commit 5fde179

Browse files
eded
authored andcommitted
refactor(websocket): extract websocket message types
1 parent 56ab182 commit 5fde179

File tree

16 files changed

+388
-325
lines changed

16 files changed

+388
-325
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@
6666
</dependency>
6767

6868

69+
<dependency>
70+
<groupId>org.projectlombok</groupId>
71+
<artifactId>lombok</artifactId>
72+
<version>1.18.32</version>
73+
<scope>provided</scope>
74+
</dependency>
6975
<!--
7076
<dependency>
7177
<groupId>org.springframework.ai</groupId>

src/main/java/com/xiaozhi/communication/common/ChatSession.java

Lines changed: 8 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import com.xiaozhi.dialogue.llm.tool.ToolsSessionHolder;
55
import com.xiaozhi.entity.SysDevice;
66
import com.xiaozhi.entity.SysRole;
7+
import com.xiaozhi.enums.ListenMode;
8+
import lombok.Data;
79
import org.springframework.ai.chat.memory.ChatMemory;
810
import org.springframework.ai.chat.messages.Message;
911
import org.springframework.ai.tool.ToolCallback;
@@ -15,6 +17,7 @@
1517
import java.util.Map;
1618
import java.util.concurrent.ConcurrentHashMap;
1719

20+
@Data
1821
public abstract class ChatSession {
1922
/**
2023
* 当前会话的sessionId
@@ -49,10 +52,14 @@ public abstract class ChatSession {
4952
* 是否正在说话
5053
*/
5154
protected boolean playing;
55+
/**
56+
* 客户端停止说话
57+
*/
58+
protected boolean clientVoiceStop = false;
5259
/**
5360
* 设备状态(auto, realTime)
5461
*/
55-
protected String mode;
62+
protected ListenMode mode;
5663
/**
5764
* 会话的音频数据流
5865
*/
@@ -109,37 +116,6 @@ public String getAssistantAudioPath() {
109116
return dialogueId == null ? "": (String) getAttribute("assistantAudioPath_" + dialogueId);
110117
}
111118

112-
public String getSessionId() {
113-
return sessionId;
114-
}
115-
116-
public void setSessionId(String sessionId) {
117-
this.sessionId = sessionId;
118-
}
119-
120-
public SysDevice getSysDevice() {
121-
return sysDevice;
122-
}
123-
124-
public void setSysDevice(SysDevice sysDevice) {
125-
this.sysDevice = sysDevice;
126-
}
127-
128-
public List<SysRole> getSysRoleList() {
129-
return sysRoleList;
130-
}
131-
132-
public void setSysRoleList(List<SysRole> sysRoleList) {
133-
this.sysRoleList = sysRoleList;
134-
}
135-
136-
public Map<String, IotDescriptor> getIotDescriptors() {
137-
return iotDescriptors;
138-
}
139-
140-
public void setIotDescriptors(Map<String, IotDescriptor> iotDescriptors) {
141-
this.iotDescriptors = iotDescriptors;
142-
}
143119

144120
public ToolsSessionHolder getFunctionSessionHolder() {
145121
return toolsSessionHolder;
@@ -153,70 +129,6 @@ public List<ToolCallback> getToolCallbacks() {
153129
return toolsSessionHolder.getAllFunction();
154130
}
155131

156-
public boolean isCloseAfterChat() {
157-
return closeAfterChat;
158-
}
159-
160-
public void setCloseAfterChat(boolean closeAfterChat) {
161-
this.closeAfterChat = closeAfterChat;
162-
}
163-
164-
public boolean isMusicPlaying() {
165-
return musicPlaying;
166-
}
167-
168-
public void setMusicPlaying(boolean musicPlaying) {
169-
this.musicPlaying = musicPlaying;
170-
}
171-
172-
public boolean isPlaying() {
173-
return playing;
174-
}
175-
176-
public void setPlaying(boolean playing) {
177-
this.playing = playing;
178-
}
179-
180-
public String getMode() {
181-
return mode;
182-
}
183-
184-
public void setMode(String mode) {
185-
this.mode = mode;
186-
}
187-
188-
public Sinks.Many<byte[]> getAudioSinks() {
189-
return audioSinks;
190-
}
191-
192-
public void setAudioSinks(Sinks.Many<byte[]> audioSinks) {
193-
this.audioSinks = audioSinks;
194-
}
195-
196-
public boolean isStreamingState() {
197-
return streamingState;
198-
}
199-
200-
public void setStreamingState(boolean streamingState) {
201-
this.streamingState = streamingState;
202-
}
203-
204-
public Instant getLastActivityTime() {
205-
return lastActivityTime;
206-
}
207-
208-
public void setLastActivityTime(Instant lastActivityTime) {
209-
this.lastActivityTime = lastActivityTime;
210-
}
211-
212-
public ChatMemory getChatMemory() {
213-
return chatMemory;
214-
}
215-
216-
public void setChatMemory(ChatMemory chatMemory) {
217-
this.chatMemory = chatMemory;
218-
}
219-
220132
public void clearMemory() {
221133
chatMemory.clear(sessionId);
222134
}

src/main/java/com/xiaozhi/communication/common/MessageHandler.java

Lines changed: 32 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.xiaozhi.communication.common;
22

33
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.xiaozhi.communication.domain.AbortMessage;
5+
import com.xiaozhi.communication.domain.GoodbyeMessage;
6+
import com.xiaozhi.communication.domain.ListenMessage;
47
import com.xiaozhi.dialogue.llm.ChatService;
58
import com.xiaozhi.dialogue.llm.factory.ChatModelFactory;
69
import com.xiaozhi.dialogue.llm.tool.ToolsGlobalRegistry;
@@ -13,6 +16,7 @@
1316
import com.xiaozhi.dialogue.tts.factory.TtsServiceFactory;
1417
import com.xiaozhi.entity.SysConfig;
1518
import com.xiaozhi.entity.SysDevice;
19+
import com.xiaozhi.enums.ListenState;
1620
import com.xiaozhi.service.SysDeviceService;
1721
import com.xiaozhi.service.SysRoleService;
1822
import jakarta.annotation.Resource;
@@ -120,7 +124,7 @@ public void afterConnection(ChatSession chatSession, String deviceIdAuth) {
120124
// 注册全局函数
121125
toolsSessionHolder.registerGlobalFunctionTools(chatSession);
122126
}
123-
127+
124128
// 更新设备状态
125129
deviceService.updateNoRefreshCache(new SysDevice()
126130
.setDeviceId(device.getDeviceId())
@@ -173,56 +177,6 @@ public void afterConnectionClosed(String sessionId) {
173177
dialogueService.cleanupSession(sessionId);
174178
}
175179

176-
/**
177-
* 处理文本消息.
178-
* @param sessionId
179-
* @param message
180-
* @param deviceId 可能为null
181-
*/
182-
public void handleTextMessage(String sessionId, JsonNode message, String deviceId) {
183-
ChatSession chatSession = sessionManager.getSession(sessionId);
184-
if(chatSession == null || !chatSession.isOpen()){
185-
return;
186-
}
187-
188-
try {
189-
// 首先尝试解析JSON消息
190-
String messageType = message.path("type").asText();
191-
192-
SysDevice device = sessionManager.getDeviceConfig(sessionId);
193-
// 对于其他消息类型,检查设备是否已绑定
194-
if (device == null) {
195-
// 设备信息不存在,需要查询
196-
Thread.startVirtualThread(() -> {
197-
try {
198-
SysDevice queryDevice = deviceService.selectDeviceById(deviceId);
199-
if (ObjectUtils.isEmpty(queryDevice) || queryDevice.getModelId() == null) {
200-
// 设备未绑定,处理未绑定设备的消息
201-
queryDevice = new SysDevice();
202-
queryDevice.setDeviceId(deviceId);
203-
handleUnboundDevice(sessionId, queryDevice);
204-
} else {
205-
// 更新缓存的设备信息
206-
sessionManager.registerDevice(sessionId, queryDevice);
207-
// 继续处理消息
208-
handleMessageByType(sessionId, message, messageType);
209-
}
210-
} catch (Exception e) {
211-
logger.error("处理消息失败", e);
212-
}
213-
});
214-
} else if (device.getModelId() == null) {
215-
// 设备存在但未绑定模型,直接处理未绑定设备
216-
handleUnboundDevice(sessionId, device);
217-
} else {
218-
// 设备已绑定且信息已缓存,直接处理消息
219-
handleMessageByType(sessionId, message, messageType);
220-
}
221-
} catch (Exception e) {
222-
logger.error("处理文本消息失败", e);
223-
}
224-
}
225-
226180
/**
227181
* 处理音频数据
228182
* @param sessionId
@@ -238,33 +192,7 @@ public void handleBinaryMessage(String sessionId, byte[] opusData) {
238192

239193
}
240194

241-
private void handleMessageByType(String sessionId, JsonNode jsonNode, String messageType) {
242-
ChatSession chatSession = sessionManager.getSession(sessionId);
243-
if(chatSession == null || !chatSession.isOpen()){
244-
return;
245-
} try {
246-
switch (messageType) {
247-
case "listen":
248-
handleListenMessage(chatSession, jsonNode);
249-
break;
250-
case "abort":
251-
dialogueService.abortDialogue(chatSession, jsonNode.path("reason").asText());
252-
break;
253-
case "iot":
254-
handleIotMessage(chatSession, jsonNode);
255-
break;
256-
case "goodbye":
257-
sessionManager.closeSession(chatSession);
258-
break;
259-
default:
260-
logger.warn("未知的消息类型: {}", messageType);
261-
}
262-
} catch (Exception e) {
263-
logger.error("处理消息失败 - 类型: " + messageType, e);
264-
}
265-
}
266-
267-
private void handleUnboundDevice(String sessionId, SysDevice device) {
195+
public void handleUnboundDevice(String sessionId, SysDevice device) {
268196
String deviceId = device.getDeviceId();
269197
ChatSession chatSession = sessionManager.getSession(sessionId);
270198
if(chatSession == null || !chatSession.isOpen()){
@@ -329,58 +257,56 @@ private void handleUnboundDevice(String sessionId, SysDevice device) {
329257
});
330258
}
331259

332-
333-
private void handleListenMessage(ChatSession chatSession, JsonNode jsonNode) {
260+
public void handleListenMessage(ChatSession chatSession, ListenMessage message) {
334261
String sessionId = chatSession.getSessionId();
335-
// 解析listen消息中的state和mode字段
336-
String state = jsonNode.path("state").asText();
337-
String mode = jsonNode.path("mode").asText();
338-
339-
logger.info("收到listen消息 - SessionId: {}, State: {}, Mode: {}", sessionId, state, mode);
340-
sessionManager.setMode(mode);
262+
logger.info("收到listen消息 - SessionId: {}, State: {}, Mode: {}", sessionId, message.getState(), message.getMode());
263+
chatSession.setMode(message.getMode());
341264

342265
// 根据state处理不同的监听状态
343-
switch (state) {
344-
case "start":
266+
switch (message.getState()) {
267+
case ListenState.Start:
345268
// 开始监听,准备接收音频数据
346-
logger.info("开始监听 - Mode: {}", mode);
269+
logger.info("开始监听 - Mode: {}", message.getMode());
347270

348271
// 初始化VAD会话
349272
vadService.initSession(sessionId);
350273
break;
351274

352-
case "stop":
275+
case ListenState.Stop:
353276
// 停止监听
354277
logger.info("停止监听");
355278

279+
chatSession.setClientVoiceStop(true);
356280
// 关闭音频流
357-
sessionManager.closeAudioStream(sessionId);
358-
sessionManager.setStreamingState(sessionId, false);
281+
// sessionManager.closeAudioStream(sessionId);
282+
// sessionManager.setStreamingState(sessionId, false);
359283
// 重置VAD会话
360-
vadService.resetSession(sessionId);
284+
// vadService.resetSession(sessionId);
361285
break;
362286

363-
case "text":
287+
case ListenState.Text:
364288
// 检测聊天文本输入
365-
String text = jsonNode.path("text").asText();
366289
if (audioService.isPlaying(sessionId)) {
367-
dialogueService.abortDialogue(chatSession, mode);
290+
dialogueService.abortDialogue(chatSession, message.getMode().getValue());
368291
}
369-
dialogueService.handleText(chatSession, text, null);
292+
dialogueService.handleText(chatSession, message.getText(), null);
370293
break;
371294

372-
case "detect":
295+
case ListenState.Detect:
373296
// 检测到唤醒词
374-
String wakeWord = jsonNode.path("text").asText();
375-
dialogueService.handleWakeWord(chatSession, wakeWord);
297+
dialogueService.handleWakeWord(chatSession, message.getText());
376298
break;
377299

378300
default:
379-
logger.warn("未知的listen状态: {}", state);
301+
logger.warn("未知的listen状态: {}", message.getState());
380302
}
381303
}
382304

383-
private void handleIotMessage(ChatSession chatSession, JsonNode jsonNode) {
305+
public void handleAbortMessage(ChatSession session, AbortMessage message) {
306+
dialogueService.abortDialogue(session, message.getReason());
307+
}
308+
309+
public void handleIotMessage(ChatSession chatSession, JsonNode jsonNode) {
384310
String sessionId = chatSession.getSessionId();
385311
logger.info("收到IoT消息 - SessionId: {}", sessionId);
386312

@@ -400,4 +326,8 @@ private void handleIotMessage(ChatSession chatSession, JsonNode jsonNode) {
400326
iotService.handleDeviceStates(sessionId, states);
401327
}
402328
}
329+
330+
public void handleGoodbyeMessage(ChatSession session, GoodbyeMessage message) {
331+
sessionManager.closeSession(session);
332+
}
403333
}

0 commit comments

Comments
 (0)