22#
33# SPDX-License-Identifier: MIT
44
5+ import asyncio
56import base64
7+ import inspect
68from pathlib import Path
7- from typing import Any , Optional , Union , cast
9+ from typing import Any , Callable , Optional , Union , cast
810
911from .__version__ import get_version
1012from .constants import DEFAULT_WS_URL
@@ -47,7 +49,9 @@ def __init__(self, token: str, server: Optional[str] = None):
4749 self ._transport = Transport (token , server or DEFAULT_WS_URL )
4850 self .last_pause_nanos = 0
4951 self ._transport .add_event_listener ("sim:pause" , self ._on_pause )
50- self ._pause_queue = EventQueue (self ._transport , "sim:pause" )
52+ # Lazily create in an active event loop (important for py3.9 and sync client)
53+ self ._pause_queue : Optional [EventQueue ] = None
54+ self ._serial_monitor_tasks : set [asyncio .Task [None ]] = set ()
5155
5256 async def connect (self ) -> dict [str , Any ]:
5357 """
@@ -61,7 +65,10 @@ async def connect(self) -> dict[str, Any]:
6165 async def disconnect (self ) -> None :
6266 """
6367 Disconnect from the Wokwi simulator server.
68+
69+ This also stops all active serial monitors.
6470 """
71+ self .stop_serial_monitors ()
6572 await self ._transport .close ()
6673
6774 async def upload (self , name : str , content : bytes ) -> None :
@@ -175,6 +182,8 @@ async def wait_until_simulation_time(self, seconds: float) -> None:
175182 await pause (self ._transport )
176183 remaining_nanos = seconds * 1e9 - self .last_pause_nanos
177184 if remaining_nanos > 0 :
185+ if self ._pause_queue is None :
186+ self ._pause_queue = EventQueue (self ._transport , "sim:pause" )
178187 self ._pause_queue .flush ()
179188 await resume (self ._transport , int (remaining_nanos ))
180189 await self ._pause_queue .get ()
@@ -188,6 +197,49 @@ async def restart_simulation(self, pause: bool = False) -> None:
188197 """
189198 await restart (self ._transport , pause )
190199
200+ def serial_monitor (self , callback : Callable [[bytes ], Any ]) -> asyncio .Task [None ]:
201+ """
202+ Start monitoring the serial output in the background and invoke `callback` for each line.
203+
204+ This method **does not block**: it creates and returns an asyncio.Task that runs until the
205+ transport is closed or the task is cancelled. The callback may be synchronous or async.
206+
207+ Example:
208+ task = client.serial_monitor(lambda line: print(line.decode(), end=""))
209+ ... do other async work ...
210+ task.cancel()
211+ """
212+
213+ async def _runner () -> None :
214+ try :
215+ async for line in monitor_lines (self ._transport ):
216+ try :
217+ result = callback (line )
218+ if inspect .isawaitable (result ):
219+ await result
220+ except Exception :
221+ # Swallow callback exceptions to keep the monitor alive.
222+ # Users can add their own error handling inside the callback.
223+ pass
224+ finally :
225+ # Clean up task from the set when it completes
226+ self ._serial_monitor_tasks .discard (task )
227+
228+ task = asyncio .create_task (_runner (), name = "wokwi-serial-monitor" )
229+ self ._serial_monitor_tasks .add (task )
230+ return task
231+
232+ def stop_serial_monitors (self ) -> None :
233+ """
234+ Stop all active serial monitor tasks.
235+
236+ This method cancels all tasks created by the serial_monitor method.
237+ After calling this method, all active serial monitors will stop receiving data.
238+ """
239+ for task in self ._serial_monitor_tasks .copy ():
240+ task .cancel ()
241+ self ._serial_monitor_tasks .clear ()
242+
191243 async def serial_monitor_cat (self , decode_utf8 : bool = True , errors : str = "replace" ) -> None :
192244 """
193245 Print serial monitor output to stdout as it is received from the simulation.
0 commit comments