From 86d2208a539028f220528839d2fcbd296dcdd147 Mon Sep 17 00:00:00 2001 From: "fangjf2001@aliyun.com" Date: Wed, 19 Mar 2025 22:07:24 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20aishengyun=20tts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + main/xiaozhi-server/core/connection.py | 3 +- .../core/providers/tts/aishengyun.py | 74 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 main/xiaozhi-server/core/providers/tts/aishengyun.py diff --git a/.gitignore b/.gitignore index 698d638c04..0236b8b537 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Byte-compiled / optimized / DLL files __pycache__/ .idea/ +.vscode/ *.py[cod] *$py.class diff --git a/main/xiaozhi-server/core/connection.py b/main/xiaozhi-server/core/connection.py index 0ff1ac0e74..fab9e4017d 100644 --- a/main/xiaozhi-server/core/connection.py +++ b/main/xiaozhi-server/core/connection.py @@ -2,6 +2,7 @@ import json import uuid import time +import copy import queue import asyncio import traceback @@ -141,7 +142,7 @@ async def handle_connection(self, ws): self.websocket = ws self.session_id = str(uuid.uuid4()) - self.welcome_msg = self.config["xiaozhi"] + self.welcome_msg = copy.deepcopy(self.config["xiaozhi"]) self.welcome_msg["session_id"] = self.session_id await self.websocket.send(json.dumps(self.welcome_msg)) diff --git a/main/xiaozhi-server/core/providers/tts/aishengyun.py b/main/xiaozhi-server/core/providers/tts/aishengyun.py new file mode 100644 index 0000000000..2adf3a8447 --- /dev/null +++ b/main/xiaozhi-server/core/providers/tts/aishengyun.py @@ -0,0 +1,74 @@ +import os +import uuid +import json +import base64 +import requests +import websockets +from datetime import datetime +#from core.utils.util import check_model_key +from core.providers.tts.base import TTSProviderBase + + +class TTSProvider(TTSProviderBase): + def __init__(self, config, delete_audio_file): + super().__init__(config, delete_audio_file) + self.access_token = config.get("access_token") + self.voice = config.get("voice") + self.model_id = config.get("model_id") + self.api_url = config.get("api_url") + self.language = config.get("language") + self.response_format = config.get("response_format") + self.headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json" + } + + def generate_filename(self, extension=".wav"): + return os.path.join(self.output_file, f"tts-{datetime.now().date()}@{uuid.uuid4().hex}{extension}") + + async def text_to_speak(self, text, output_file): + request_json = { + "model_id": self.model_id, + "voice": { + "mode": "id", + "id": self.voice + }, + "output_format": { + "container": self.response_format, + "encoding": "pcm_s16le", + "sample_rate": 16000 + }, + "language": self.language, + "transcript": text + } + + try: + resp = requests.post(self.api_url, json.dumps(request_json), headers=self.headers) + #file_name = resp.headers["content-disposition"].split("filename=")[1] + with open(output_file, "wb") as file: + file.write(resp.content) + except Exception as e: + raise Exception(f"{__name__} error: {e}") + +async def main(): + from config.settings import load_config, check_config_file + from core.utils import tts + #check_config_file() + config = load_config() + #print(config) + instance = tts.create_instance( + config["selected_module"]["TTS"] + if not 'type' in config["TTS"][config["selected_module"]["TTS"]] + else + config["TTS"][config["selected_module"]["TTS"]]["type"], + config["TTS"][config["selected_module"]["TTS"]], + config["delete_audio"] + ) + file = instance.generate_filename() + print(file) + await instance.text_to_speak("你好", file) + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) + \ No newline at end of file From 3faae85713712c945706f520572dec4dc23bc06b Mon Sep 17 00:00:00 2001 From: "fangjf2001@aliyun.com" Date: Fri, 21 Mar 2025 20:56:10 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=8A=8A=20ConnectionHandler=20=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=20threading=20=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=94=B9=E6=88=90=20asyncio=20=E7=9B=B8=E5=85=B3=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main/xiaozhi-server/core/connection.py | 138 +++++++++--------- .../core/handle/intentHandler.py | 4 +- .../core/handle/receiveAudioHandle.py | 4 +- 3 files changed, 74 insertions(+), 72 deletions(-) diff --git a/main/xiaozhi-server/core/connection.py b/main/xiaozhi-server/core/connection.py index ea6f3c17e9..c85b9585de 100644 --- a/main/xiaozhi-server/core/connection.py +++ b/main/xiaozhi-server/core/connection.py @@ -3,11 +3,9 @@ import uuid import time import copy -import queue import asyncio import traceback -import threading import websockets from typing import Dict, Any import plugins_func.loadplugins @@ -15,7 +13,6 @@ from core.utils.dialogue import Message, Dialogue from core.handle.textHandle import handleTextMessage from core.utils.util import get_string_no_punctuation_or_emoji, extract_json_from_string, get_ip_info -from concurrent.futures import ThreadPoolExecutor, TimeoutError from core.handle.sendAudioHandle import sendAudioMessage from core.handle.receiveAudioHandle import handleAudioMessage from core.handle.functionHandler import FunctionHandler @@ -49,12 +46,12 @@ def __init__(self, config: Dict[str, Any], _vad, _asr, _llm, _tts, _memory, _int self.client_abort = False self.client_listen_mode = "auto" - # 线程任务相关 - self.loop = asyncio.get_event_loop() - self.stop_event = threading.Event() - self.tts_queue = queue.Queue() - self.audio_play_queue = queue.Queue() - self.executor = ThreadPoolExecutor(max_workers=10) + # 异步任务相关 + self.loop = asyncio.get_running_loop() + self.stop_event = asyncio.Event() + self.tts_queue = asyncio.Queue() + self.audio_play_queue = asyncio.Queue() + self.background_tasks = set() # 依赖的组件 self.vad = _vad @@ -153,13 +150,15 @@ async def handle_connection(self, ws): # 异步初始化 await self.loop.run_in_executor(None, self._initialize_components) - # tts 消化线程 - tts_priority = threading.Thread(target=self._tts_priority_thread, daemon=True) - tts_priority.start() + # 启动TTS任务 + tts_task = asyncio.create_task(self._tts_priority_task()) + self.background_tasks.add(tts_task) + tts_task.add_done_callback(self.background_tasks.discard) - # 音频播放 消化线程 - audio_play_priority = threading.Thread(target=self._audio_play_priority_thread, daemon=True) - audio_play_priority.start() + # 启动音频播放任务 + audio_play_task = asyncio.create_task(self._audio_play_priority_task()) + self.background_tasks.add(audio_play_task) + audio_play_task.add_done_callback(self.background_tasks.discard) try: async for message in self.websocket: @@ -214,8 +213,7 @@ async def _check_and_broadcast_auth_code(self): # 发送验证码语音提示 text = f"请在后台输入验证码:{' '.join(auth_code)}" self.recode_first_last_text(text) - future = self.executor.submit(self.speak_and_play, text) - self.tts_queue.put(future) + await self.tts_queue.put((text, 0)) return False return True @@ -226,11 +224,10 @@ def isNeedAuth(self): return False return not self.is_device_verified - def chat(self, query): + async def chat(self, query): if self.isNeedAuth(): self.llm_finish_task = True - future = asyncio.run_coroutine_threadsafe(self._check_and_broadcast_auth_code(), self.loop) - future.result() + await self._check_and_broadcast_auth_code() return True self.dialogue.put(Message(role="user", content=query)) @@ -240,8 +237,7 @@ def chat(self, query): try: start_time = time.time() # 使用带记忆的对话 - future = asyncio.run_coroutine_threadsafe(self.memory.query_memory(query), self.loop) - memory_str = future.result() + memory_str = await self.memory.query_memory(query) self.logger.bind(tag=TAG).debug(f"记忆内容: {memory_str}") llm_responses = self.llm.response( @@ -284,8 +280,7 @@ def chat(self, query): # segment_text = " " text_index += 1 self.recode_first_last_text(segment_text, text_index) - future = self.executor.submit(self.speak_and_play, segment_text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((segment_text, text_index)) processed_chars += len(segment_text_raw) # 更新已处理字符位置 # 处理最后剩余的文本 @@ -296,21 +291,24 @@ def chat(self, query): if segment_text: text_index += 1 self.recode_first_last_text(segment_text, text_index) - future = self.executor.submit(self.speak_and_play, segment_text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((segment_text, text_index)) self.llm_finish_task = True self.dialogue.put(Message(role="assistant", content="".join(response_message))) self.logger.bind(tag=TAG).debug(json.dumps(self.dialogue.get_llm_dialogue(), indent=4, ensure_ascii=False)) return True - def chat_with_function_calling(self, query, tool_call=False): + def create_chat_task(self, query): + task = asyncio.create_task(self.chat(query)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + + async def chat_with_function_calling(self, query, tool_call=False): self.logger.bind(tag=TAG).debug(f"Chat with function calling start: {query}") """Chat with function calling for intent detection using streaming""" if self.isNeedAuth(): self.llm_finish_task = True - future = asyncio.run_coroutine_threadsafe(self._check_and_broadcast_auth_code(), self.loop) - future.result() + await self._check_and_broadcast_auth_code() return True if not tool_call: @@ -326,8 +324,7 @@ def chat_with_function_calling(self, query, tool_call=False): start_time = time.time() # 使用带记忆的对话 - future = asyncio.run_coroutine_threadsafe(self.memory.query_memory(query), self.loop) - memory_str = future.result() + memory_str = await self.memory.query_memory(query) # self.logger.bind(tag=TAG).info(f"对话记录: {self.dialogue.get_llm_dialogue_with_memory(memory_str)}") @@ -397,8 +394,7 @@ def chat_with_function_calling(self, query, tool_call=False): if segment_text: text_index += 1 self.recode_first_last_text(segment_text, text_index) - future = self.executor.submit(self.speak_and_play, segment_text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((segment_text, text_index)) processed_chars += len(segment_text_raw) # 更新已处理字符位置 # 处理function call @@ -431,7 +427,7 @@ def chat_with_function_calling(self, query, tool_call=False): "arguments": function_arguments } result = self.func_handler.handle_llm_function_call(self, function_call_data) - self._handle_function_result(result, function_call_data, text_index + 1) + await self._handle_function_result(result, function_call_data, text_index + 1) # 处理最后剩余的文本 full_text = "".join(response_message) @@ -441,8 +437,7 @@ def chat_with_function_calling(self, query, tool_call=False): if segment_text: text_index += 1 self.recode_first_last_text(segment_text, text_index) - future = self.executor.submit(self.speak_and_play, segment_text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((segment_text, text_index)) # 存储对话内容 if len(response_message) > 0: @@ -453,12 +448,16 @@ def chat_with_function_calling(self, query, tool_call=False): return True - def _handle_function_result(self, result, function_call_data, text_index): + def create_chat_with_function_calling_task(self, query, tool_call=False): + task = asyncio.create_task(self.chat_with_function_calling(query, tool_call)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + + async def _handle_function_result(self, result, function_call_data, text_index): if result.action == Action.RESPONSE: # 直接回复前端 text = result.response self.recode_first_last_text(text, text_index) - future = self.executor.submit(self.speak_and_play, text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((text, text_index)) self.dialogue.put(Message(role="assistant", content=text)) elif result.action == Action.REQLLM: # 调用函数后再请求llm生成回复 @@ -475,33 +474,32 @@ def _handle_function_result(self, result, function_call_data, text_index): "index": 0}])) self.dialogue.put(Message(role="tool", tool_call_id=function_id, content=text)) - self.chat_with_function_calling(text, tool_call=True) + await self.chat_with_function_calling(text, tool_call=True) elif result.action == Action.NOTFOUND: text = result.result self.recode_first_last_text(text, text_index) - future = self.executor.submit(self.speak_and_play, text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((text, text_index)) self.dialogue.put(Message(role="assistant", content=text)) else: text = result.result self.recode_first_last_text(text, text_index) - future = self.executor.submit(self.speak_and_play, text, text_index) - self.tts_queue.put(future) + await self.tts_queue.put((text, text_index)) self.dialogue.put(Message(role="assistant", content=text)) - def _tts_priority_thread(self): + async def _tts_priority_task(self): while not self.stop_event.is_set(): text = None try: - future = self.tts_queue.get() - if future is None: + text, text_index = await self.tts_queue.get() + if text is None: continue - text = None - opus_datas, text_index, tts_file = [], 0, None + + opus_datas, tts_file = [], None try: self.logger.bind(tag=TAG).debug("正在处理TTS任务...") tts_timeout = self.config.get("tts_timeout", 10) - tts_file, text, text_index = future.result(timeout=tts_timeout) + tts_file, text, text_index = await asyncio.wait_for(self.speak_and_play(text, text_index), timeout=tts_timeout) + if text is None or len(text) <= 0: self.logger.bind(tag=TAG).error(f"TTS出错:{text_index}: tts text is empty") elif tts_file is None: @@ -512,40 +510,40 @@ def _tts_priority_thread(self): opus_datas, duration = self.tts.audio_to_opus_data(tts_file) else: self.logger.bind(tag=TAG).error(f"TTS出错:文件不存在{tts_file}") - except TimeoutError: + except asyncio.TimeoutError | TimeoutError: self.logger.bind(tag=TAG).error("TTS超时") except Exception as e: self.logger.bind(tag=TAG).error(f"TTS出错: {e}") + if not self.client_abort: # 如果没有中途打断就发送语音 - self.audio_play_queue.put((opus_datas, text, text_index)) + await self.audio_play_queue.put((opus_datas, text, text_index)) + if self.tts.delete_audio_file and tts_file is not None and os.path.exists(tts_file): os.remove(tts_file) except Exception as e: self.logger.bind(tag=TAG).error(f"TTS任务处理错误: {e}") self.clearSpeakStatus() - asyncio.run_coroutine_threadsafe( - self.websocket.send(json.dumps({"type": "tts", "state": "stop", "session_id": self.session_id})), - self.loop - ) - self.logger.bind(tag=TAG).error(f"tts_priority priority_thread: {text} {e}") + await self.websocket.send(json.dumps({"type": "tts", "state": "stop", "session_id": self.session_id})) + self.logger.bind(tag=TAG).error(f"tts_priority task: {text} {e}") - def _audio_play_priority_thread(self): + async def _audio_play_priority_task(self): while not self.stop_event.is_set(): text = None try: - opus_datas, text, text_index = self.audio_play_queue.get() - future = asyncio.run_coroutine_threadsafe(sendAudioMessage(self, opus_datas, text, text_index), - self.loop) - future.result() + opus_datas, text, text_index = await self.audio_play_queue.get() + await sendAudioMessage(self, opus_datas, text, text_index) except Exception as e: - self.logger.bind(tag=TAG).error(f"audio_play_priority priority_thread: {text} {e}") + self.logger.bind(tag=TAG).error(f"audio_play_priority task: {text} {e}") - def speak_and_play(self, text, text_index=0): + async def speak_and_play(self, text, text_index=0): if text is None or len(text) <= 0: self.logger.bind(tag=TAG).info(f"无需tts转换,query为空,{text}") return None, text, text_index - tts_file = self.tts.to_tts(text) + + # 使用事件循环运行同步的TTS方法 + tts_file = await self.loop.run_in_executor(None, self.tts.to_tts, text) + if tts_file is None: self.logger.bind(tag=TAG).error(f"tts转换失败,{text}") return None, text, text_index @@ -569,7 +567,6 @@ async def close(self): # 清理其他资源 self.stop_event.set() - self.executor.shutdown(wait=False) if self.websocket: await self.websocket.close() self.logger.bind(tag=TAG).info("连接资源已释放") @@ -581,13 +578,18 @@ def reset_vad_states(self): self.client_voice_stop = False self.logger.bind(tag=TAG).debug("VAD states reset.") - def chat_and_close(self, text): + async def chat_and_close(self, text): """Chat with the user and then close the connection""" try: # Use the existing chat method - self.chat(text) + await self.chat(text) # After chat is complete, close the connection self.close_after_chat = True except Exception as e: self.logger.bind(tag=TAG).error(f"Chat and close error: {str(e)}") + + def create_chat_and_close_task(self, text): + task = asyncio.create_task(self.chat_and_close(text)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) diff --git a/main/xiaozhi-server/core/handle/intentHandler.py b/main/xiaozhi-server/core/handle/intentHandler.py index 1799516cee..f931574eb3 100644 --- a/main/xiaozhi-server/core/handle/intentHandler.py +++ b/main/xiaozhi-server/core/handle/intentHandler.py @@ -67,7 +67,7 @@ async def process_intent_result(conn, intent, original_text): logger.bind(tag=TAG).info(f"识别到退出意图: {intent}") # 如果是明确的离别意图,发送告别语并关闭连接 await send_stt_message(conn, original_text) - conn.executor.submit(conn.chat_and_close, original_text) + conn.create_chat_and_close_task(original_text) return True # 处理播放音乐意图 @@ -106,4 +106,4 @@ def extract_text_in_brackets(s): if left_bracket_index != -1 and right_bracket_index != -1 and left_bracket_index < right_bracket_index: return s[left_bracket_index + 1:right_bracket_index] else: - return "" \ No newline at end of file + return "" diff --git a/main/xiaozhi-server/core/handle/receiveAudioHandle.py b/main/xiaozhi-server/core/handle/receiveAudioHandle.py index c5b4135caf..7a11a65e96 100644 --- a/main/xiaozhi-server/core/handle/receiveAudioHandle.py +++ b/main/xiaozhi-server/core/handle/receiveAudioHandle.py @@ -57,9 +57,9 @@ async def startToChat(conn, text): await send_stt_message(conn, text) if conn.use_function_call_mode: # 使用支持function calling的聊天方法 - conn.executor.submit(conn.chat_with_function_calling, text) + conn.create_chat_with_function_calling_task(text) else: - conn.executor.submit(conn.chat, text) + conn.create_chat_task(text) async def no_voice_close_connect(conn): From f4a4998d9e57027d0e02677581008aa4094c081e Mon Sep 17 00:00:00 2001 From: "fangjf2001@aliyun.com" Date: Sun, 23 Mar 2025 10:38:04 +0800 Subject: [PATCH 3/4] =?UTF-8?q?fix=20except=20=E8=AF=AD=E5=8F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main/xiaozhi-server/core/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/xiaozhi-server/core/connection.py b/main/xiaozhi-server/core/connection.py index c85b9585de..d0132a8d88 100644 --- a/main/xiaozhi-server/core/connection.py +++ b/main/xiaozhi-server/core/connection.py @@ -510,7 +510,7 @@ async def _tts_priority_task(self): opus_datas, duration = self.tts.audio_to_opus_data(tts_file) else: self.logger.bind(tag=TAG).error(f"TTS出错:文件不存在{tts_file}") - except asyncio.TimeoutError | TimeoutError: + except asyncio.TimeoutError: self.logger.bind(tag=TAG).error("TTS超时") except Exception as e: self.logger.bind(tag=TAG).error(f"TTS出错: {e}") From 6551b329ee447bf5fac4d0426d79057baa6f00be Mon Sep 17 00:00:00 2001 From: "fangjf2001@aliyun.com" Date: Mon, 24 Mar 2025 15:36:43 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=88=A0=E9=99=A4=20aishengyun=20tts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/providers/tts/aishengyun.py | 74 ------------------- 1 file changed, 74 deletions(-) delete mode 100644 main/xiaozhi-server/core/providers/tts/aishengyun.py diff --git a/main/xiaozhi-server/core/providers/tts/aishengyun.py b/main/xiaozhi-server/core/providers/tts/aishengyun.py deleted file mode 100644 index 2adf3a8447..0000000000 --- a/main/xiaozhi-server/core/providers/tts/aishengyun.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -import uuid -import json -import base64 -import requests -import websockets -from datetime import datetime -#from core.utils.util import check_model_key -from core.providers.tts.base import TTSProviderBase - - -class TTSProvider(TTSProviderBase): - def __init__(self, config, delete_audio_file): - super().__init__(config, delete_audio_file) - self.access_token = config.get("access_token") - self.voice = config.get("voice") - self.model_id = config.get("model_id") - self.api_url = config.get("api_url") - self.language = config.get("language") - self.response_format = config.get("response_format") - self.headers = { - "Authorization": f"Bearer {self.access_token}", - "Content-Type": "application/json" - } - - def generate_filename(self, extension=".wav"): - return os.path.join(self.output_file, f"tts-{datetime.now().date()}@{uuid.uuid4().hex}{extension}") - - async def text_to_speak(self, text, output_file): - request_json = { - "model_id": self.model_id, - "voice": { - "mode": "id", - "id": self.voice - }, - "output_format": { - "container": self.response_format, - "encoding": "pcm_s16le", - "sample_rate": 16000 - }, - "language": self.language, - "transcript": text - } - - try: - resp = requests.post(self.api_url, json.dumps(request_json), headers=self.headers) - #file_name = resp.headers["content-disposition"].split("filename=")[1] - with open(output_file, "wb") as file: - file.write(resp.content) - except Exception as e: - raise Exception(f"{__name__} error: {e}") - -async def main(): - from config.settings import load_config, check_config_file - from core.utils import tts - #check_config_file() - config = load_config() - #print(config) - instance = tts.create_instance( - config["selected_module"]["TTS"] - if not 'type' in config["TTS"][config["selected_module"]["TTS"]] - else - config["TTS"][config["selected_module"]["TTS"]]["type"], - config["TTS"][config["selected_module"]["TTS"]], - config["delete_audio"] - ) - file = instance.generate_filename() - print(file) - await instance.text_to_speak("你好", file) - -if __name__ == "__main__": - import asyncio - asyncio.run(main()) - \ No newline at end of file