Skip to content

Commit 9a7243f

Browse files
Fix real-time streaming (#130)
1 parent c88e079 commit 9a7243f

File tree

7 files changed

+617
-192
lines changed

7 files changed

+617
-192
lines changed

README.md

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -281,27 +281,39 @@ Due to the overhead of the router (API call) - realtime only supports a single a
281281

282282
Realtime uses MongoDB for memory so Zep is not needed.
283283

284+
This example will work using expo-audio on Android and iOS.
285+
284286
```python
285287
from solana_agent import SolanaAgent
286288

287289
solana_agent = SolanaAgent(config=config)
288290

289-
# Example: mobile sends MP4/AAC; server encodes output to AAC
290-
audio_content = await audio_file.read() # bytes
291-
async for audio_chunk in solana_agent.process(
292-
"user123", # required
293-
audio_content, # required
294-
realtime=True, # optional (default False)
295-
output_format="audio", # required
296-
vad=True, # enable VAD (optional)
297-
rt_encode_input=True, # accept compressed input (optional)
298-
rt_encode_output=True, # encode output for client (optional)
299-
rt_voice="marin" # the voice to use for interactions (optional)
300-
audio_input_format="mp4", # client transport (optional)
301-
audio_output_format="aac" # client transport (optional)
302-
):
303-
handle_audio(audio_chunk)
304-
```
291+
audio_content = await audio_file.read()
292+
293+
async def generate():
294+
async for chunk in solana_agent.process(
295+
user_id=user_id,
296+
message=audio_content,
297+
realtime=True,
298+
rt_encode_input=True,
299+
rt_encode_output=True,
300+
rt_voice="marin",
301+
output_format="audio",
302+
audio_output_format="m4a",
303+
audio_input_format="mp4",
304+
):
305+
yield chunk
306+
307+
return StreamingResponse(
308+
content=generate(),
309+
media_type="audio/mp4",
310+
headers={
311+
"Cache-Control": "no-store",
312+
"Pragma": "no-cache",
313+
"Content-Disposition": "inline; filename=stream.m4a",
314+
"X-Accel-Buffering": "no",
315+
},
316+
)
305317

306318
### Image/Text Streaming
307319

docs/index.rst

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -205,27 +205,40 @@ Due to the overhead of the router (API call) - realtime only supports a single a
205205

206206
Realtime uses MongoDB for memory so Zep is not needed.
207207

208+
This example will work using expo-audio on Android and iOS.
209+
208210
.. code-block:: python
209211
210212
from solana_agent import SolanaAgent
211213
212214
solana_agent = SolanaAgent(config=config)
213215
214-
# Example: mobile sends MP4/AAC; server encodes output to AAC
215-
audio_content = await audio_file.read() # bytes
216-
async for audio_chunk in solana_agent.process(
217-
"user123", # required
218-
audio_content, # required
219-
realtime=True, # optional (default False)
220-
output_format="audio", # required
221-
vad=True, # enable VAD (optional)
222-
rt_encode_input=True, # accept compressed input (optional)
223-
rt_encode_output=True, # encode output for client (optional)
224-
rt_voice="marin" # the voice to use for interactions (optional)
225-
audio_input_format="mp4", # client transport (optional)
226-
audio_output_format="aac" # client transport (optional)
227-
):
228-
handle_audio(audio_chunk)
216+
audio_content = await audio_file.read()
217+
218+
async def generate():
219+
async for chunk in solana_agent.process(
220+
user_id=user_id,
221+
message=audio_content,
222+
realtime=True,
223+
rt_encode_input=True,
224+
rt_encode_output=True,
225+
rt_voice="marin",
226+
output_format="audio",
227+
audio_output_format="m4a",
228+
audio_input_format="mp4",
229+
):
230+
yield chunk
231+
232+
return StreamingResponse(
233+
content=generate(),
234+
media_type="audio/mp4",
235+
headers={
236+
"Cache-Control": "no-store",
237+
"Pragma": "no-cache",
238+
"Content-Disposition": "inline; filename=stream.m4a",
239+
"X-Accel-Buffering": "no",
240+
},
241+
)
229242
230243
Image/Text Streaming
231244
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "solana-agent"
3-
version = "31.2.2"
3+
version = "31.2.3"
44
description = "AI Agents for Solana"
55
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
66
license = "MIT"

solana_agent/adapters/ffmpeg_transcoder.py

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,14 @@ async def to_pcm16( # pragma: no cover
8888
async def from_pcm16( # pragma: no cover
8989
self, pcm16_bytes: bytes, output_mime: str, rate_hz: int
9090
) -> bytes:
91-
"""Encode PCM16LE to desired format (currently AAC ADTS for mobile streaming)."""
91+
"""Encode PCM16LE to desired format (AAC ADTS, fragmented MP4, or MP3)."""
9292
logger.info(
9393
"Encode from PCM16: output_mime=%s, rate_hz=%d, input_len=%d",
9494
output_mime,
9595
rate_hz,
9696
len(pcm16_bytes),
9797
)
98+
9899
if output_mime in ("audio/mpeg", "audio/mp3"):
99100
# Encode to MP3 (often better streaming compatibility on mobile)
100101
args = [
@@ -122,8 +123,9 @@ async def from_pcm16( # pragma: no cover
122123
"Encoded from PCM16 to %s: output_len=%d", output_mime, len(out)
123124
)
124125
return out
125-
if output_mime in ("audio/aac", "audio/mp4", "audio/m4a"):
126-
# Encode to AAC in ADTS stream; clients can play it as AAC.
126+
127+
if output_mime in ("audio/aac",):
128+
# Encode to AAC in ADTS stream; good for streaming over sockets/HTTP chunked
127129
args = [
128130
"-hide_banner",
129131
"-loglevel",
@@ -149,6 +151,38 @@ async def from_pcm16( # pragma: no cover
149151
"Encoded from PCM16 to %s: output_len=%d", output_mime, len(out)
150152
)
151153
return out
154+
155+
if output_mime in ("audio/mp4", "audio/m4a"):
156+
# Encode to fragmented MP4 (fMP4) with AAC for better iOS compatibility
157+
# For streaming, write an initial moov and fragment over stdout.
158+
args = [
159+
"-hide_banner",
160+
"-loglevel",
161+
"error",
162+
"-f",
163+
"s16le",
164+
"-ac",
165+
"1",
166+
"-ar",
167+
str(rate_hz),
168+
"-i",
169+
"pipe:0",
170+
"-c:a",
171+
"aac",
172+
"-b:a",
173+
"96k",
174+
"-movflags",
175+
"+frag_keyframe+empty_moov",
176+
"-f",
177+
"mp4",
178+
"pipe:1",
179+
]
180+
out = await self._run_ffmpeg(args, pcm16_bytes)
181+
logger.info(
182+
"Encoded from PCM16 to %s (fMP4): output_len=%d", output_mime, len(out)
183+
)
184+
return out
185+
152186
# Default: passthrough
153187
logger.info("Encode passthrough (no change), output_len=%d", len(pcm16_bytes))
154188
return pcm16_bytes
@@ -187,7 +221,7 @@ async def stream_from_pcm16( # pragma: no cover
187221
"mp3",
188222
"pipe:1",
189223
]
190-
elif output_mime in ("audio/aac", "audio/mp4", "audio/m4a"):
224+
elif output_mime in ("audio/aac",):
191225
args = [
192226
"-hide_banner",
193227
"-loglevel",
@@ -208,6 +242,29 @@ async def stream_from_pcm16( # pragma: no cover
208242
"adts",
209243
"pipe:1",
210244
]
245+
elif output_mime in ("audio/mp4", "audio/m4a"):
246+
args = [
247+
"-hide_banner",
248+
"-loglevel",
249+
"error",
250+
"-f",
251+
"s16le",
252+
"-ac",
253+
"1",
254+
"-ar",
255+
str(rate_hz),
256+
"-i",
257+
"pipe:0",
258+
"-c:a",
259+
"aac",
260+
"-b:a",
261+
"96k",
262+
"-movflags",
263+
"+frag_keyframe+empty_moov",
264+
"-f",
265+
"mp4",
266+
"pipe:1",
267+
]
211268
else:
212269
# Passthrough streaming: just yield input
213270
async for chunk in pcm_iter:

solana_agent/adapters/openai_realtime_ws.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,26 @@ async def _recv_loop(self) -> None: # pragma: no cover
325325
try:
326326
chunk = base64.b64decode(b64)
327327
self._audio_queue.put_nowait(chunk)
328-
logger.info("Audio delta bytes=%d", len(chunk))
328+
# Ownership/response tagging for diagnostics
329+
try:
330+
owner = getattr(self, "_owner_user_id", None)
331+
except Exception:
332+
owner = None
333+
try:
334+
rid = getattr(self, "_active_response_id", None)
335+
except Exception:
336+
rid = None
337+
try:
338+
gen = int(getattr(self, "_response_generation", 0))
339+
except Exception:
340+
gen = None
341+
logger.info(
342+
"Audio delta bytes=%d owner=%s rid=%s gen=%s",
343+
len(chunk),
344+
owner,
345+
rid,
346+
gen,
347+
)
329348
try:
330349
# New response detected if we were previously inactive
331350
if not getattr(self, "_response_active", False):
@@ -492,8 +511,25 @@ async def _recv_loop(self) -> None: # pragma: no cover
492511
"response.audio.done",
493512
):
494513
# End of audio stream for the response; stop audio iterator but keep WS open for transcripts
514+
try:
515+
owner = getattr(self, "_owner_user_id", None)
516+
except Exception:
517+
owner = None
518+
try:
519+
rid = (data.get("response") or {}).get("id") or getattr(
520+
self, "_active_response_id", None
521+
)
522+
except Exception:
523+
rid = None
524+
try:
525+
gen = int(getattr(self, "_response_generation", 0))
526+
except Exception:
527+
gen = None
495528
logger.info(
496-
"Realtime WS: output audio done; ending audio stream"
529+
"Realtime WS: output audio done; owner=%s rid=%s gen=%s",
530+
owner,
531+
rid,
532+
gen,
497533
)
498534
# If we have a buffered transcript for this response, flush it now
499535
try:

0 commit comments

Comments
 (0)