Skip to content

Commit c88e079

Browse files
Fix realtime audio (#129)
* wip * wip * wip * done
1 parent d3acd80 commit c88e079

File tree

4 files changed

+54
-8
lines changed

4 files changed

+54
-8
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "solana-agent"
3-
version = "31.2.1"
3+
version = "31.2.2"
44
description = "AI Agents for Solana"
55
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
66
license = "MIT"

solana_agent/adapters/openai_realtime_ws.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -961,9 +961,6 @@ async def update_session(
961961
if audio_patch:
962962
patch["audio"] = audio_patch
963963

964-
# Always include session.type in updates
965-
patch["type"] = "realtime"
966-
967964
# No top-level turn_detection
968965

969966
def _strip_tool_strict(tools_val):
@@ -1030,7 +1027,8 @@ def _strip_tool_strict(tools_val):
10301027
)
10311028
except Exception:
10321029
pass
1033-
await self._send(payload)
1030+
# Use tracked send to attach an event_id and improve diagnostics
1031+
await self._send_tracked(payload, label="session.update:patch")
10341032

10351033
async def append_audio(self, pcm16_bytes: bytes) -> None: # pragma: no cover
10361034
b64 = base64.b64encode(pcm16_bytes).decode("ascii")
@@ -1045,10 +1043,16 @@ async def append_audio(self, pcm16_bytes: bytes) -> None: # pragma: no cover
10451043

10461044
async def commit_input(self) -> None: # pragma: no cover
10471045
try:
1048-
# Skip commits while a response is active to avoid server errors
1046+
# If a previous response is still marked active, wait briefly, then proceed.
1047+
# Skipping commits here can cause new turns to reference old audio and repeat answers.
10491048
if bool(getattr(self, "_response_active", False)):
1050-
logger.warning("Realtime WS: skipping commit; response active")
1051-
return
1049+
logger.warning(
1050+
"Realtime WS: response active at commit; waiting briefly before proceeding"
1051+
)
1052+
for _ in range(5): # up to ~0.5s
1053+
await asyncio.sleep(0.1)
1054+
if not bool(getattr(self, "_response_active", False)):
1055+
break
10521056
# Avoid overlapping commits while awaiting server ack
10531057
if bool(getattr(self, "_commit_inflight", False)):
10541058
logger.warning("Realtime WS: skipping commit; commit in-flight")
@@ -1250,6 +1254,24 @@ def iter_output_transcript(self) -> AsyncGenerator[str, None]: # pragma: no cov
12501254
def set_tool_executor(self, executor): # pragma: no cover
12511255
self._tool_executor = executor
12521256

1257+
def reset_output_stream(self) -> None: # pragma: no cover
1258+
"""Drain any queued output audio and clear per-response text buffers.
1259+
This avoids replaying stale audio if the client failed to consume previous chunks."""
1260+
try:
1261+
while True:
1262+
try:
1263+
_ = self._audio_queue.get_nowait()
1264+
except asyncio.QueueEmpty:
1265+
break
1266+
except Exception:
1267+
break
1268+
try:
1269+
self._out_text_buffers.clear()
1270+
except Exception:
1271+
pass
1272+
except Exception:
1273+
pass
1274+
12531275
# Expose whether a function/tool call is currently pending
12541276
def has_pending_tool_call(self) -> bool: # pragma: no cover
12551277
try:
@@ -1611,3 +1633,7 @@ async def _empty():
16111633
def set_tool_executor(self, executor): # pragma: no cover
16121634
# Not applicable for transcription-only
16131635
return
1636+
1637+
def reset_output_stream(self) -> None: # pragma: no cover
1638+
# No audio output stream to reset
1639+
return

solana_agent/services/query.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,12 @@ async def _exec(tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
669669
await rt.clear_input()
670670
except Exception:
671671
pass
672+
# Also reset any leftover output audio so new turn doesn't replay old chunks
673+
try:
674+
if hasattr(rt, "reset_output_stream"):
675+
rt.reset_output_stream()
676+
except Exception:
677+
pass
672678

673679
# Persist once per turn
674680
turn_id = await self.realtime_begin_turn(user_id)

solana_agent/services/realtime.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ def iter_events(self) -> AsyncGenerator[Dict[str, Any], None]: # pragma: no cov
185185
def iter_output_audio(self) -> AsyncGenerator[bytes, None]: # pragma: no cover
186186
return self._session.iter_output_audio()
187187

188+
def reset_output_stream(self) -> None: # pragma: no cover
189+
try:
190+
if hasattr(self._session, "reset_output_stream"):
191+
self._session.reset_output_stream()
192+
except Exception:
193+
pass
194+
188195
async def iter_output_audio_encoded(
189196
self,
190197
) -> AsyncGenerator[bytes, None]: # pragma: no cover
@@ -447,6 +454,13 @@ def iter_events(self) -> AsyncGenerator[Dict[str, Any], None]: # pragma: no cov
447454
def iter_output_audio(self) -> AsyncGenerator[bytes, None]: # pragma: no cover
448455
return self._conv.iter_output_audio()
449456

457+
def reset_output_stream(self) -> None: # pragma: no cover
458+
try:
459+
if hasattr(self._conv, "reset_output_stream"):
460+
self._conv.reset_output_stream()
461+
except Exception:
462+
pass
463+
450464
async def iter_output_audio_encoded(
451465
self,
452466
) -> AsyncGenerator[bytes, None]: # pragma: no cover

0 commit comments

Comments
 (0)