Skip to content

Commit 9d6048e

Browse files
author
nshkrdotcom
committed
update 0.1.1
1 parent 2b4bbc2 commit 9d6048e

File tree

4 files changed

+158
-6
lines changed

4 files changed

+158
-6
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ mix pipeline.run evolved_pipelines/sentiment_analyzer_*.yaml
5757
defp deps do
5858
[
5959
# From Hex.pm (recommended)
60-
{:pipeline_ex, "~> 0.1.0"}
60+
{:pipeline_ex, "~> 0.1.1"}
6161

6262
# Or from GitHub
63-
# {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.1.0"}
63+
# {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.1.1"}
6464
]
6565
end
6666
```
6767

68-
> **⚠️ Breaking Change in v0.1.0:** Async streaming system removed. ClaudeCodeSDK already streams messages optimally - the custom buffering system added unnecessary complexity. See `docs/ASYNC_STREAMING_DEPRECATION.md` for details.
68+
> **⚠️ Breaking Change in v0.1.0** Async streaming system removed. ClaudeCodeSDK already streams messages optimally - the custom buffering system added unnecessary complexity. See `docs/ASYNC_STREAMING_DEPRECATION.md` for details.
6969
7070
### Simple API
7171

@@ -761,4 +761,4 @@ TEST_MODE=live elixir run_example.exs # All live
761761

762762
## License
763763

764-
TODO: Add license information
764+
TODO: Add license information
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Async Streaming Migration Guide
2+
3+
This document helps teams migrate from the legacy async streaming hooks to the current streaming architecture introduced in `pipeline_ex` v0.3.
4+
5+
## 1. Migration Checklist
6+
7+
- Audit pipelines for `Pipeline.AsyncStreaming.Handler` usage.
8+
- Replace handler modules with the new `Pipeline.Streaming.*` namespace.
9+
- Confirm each LLM step sets `stream: true` in YAML or step configs.
10+
- Run `mix test --only streaming` to validate the new flow.
11+
12+
## 2. Architectural Changes
13+
14+
| Area | Legacy System | Current System | Migration Notes |
15+
| --- | --- | --- | --- |
16+
| Event shape | `{:chunk, binary}` | `{:delta, %{content: binary}}` | Update pattern matches to extract `%{content: chunk}`. |
17+
| Final callback | `{:complete, result}` | `{:done, response}` | Final responses now carry metadata such as tokens and finish reason. |
18+
| Handler state | Global Agent | Lightweight module functions | Move mutable state into per-run processes (e.g., Task or GenServer). |
19+
| Configuration | `async_stream: true` | `stream: true` | Remove the `async_` prefix everywhere. |
20+
21+
## 3. Updating Elixir Steps
22+
23+
```elixir
24+
# BEFORE
25+
Pipeline.AsyncStreaming.invoke!(
26+
prompt: ctx.prompt,
27+
handler: Pipeline.AsyncStreaming.ConsoleHandler
28+
)
29+
30+
# AFTER
31+
Pipeline.LLM.invoke!(
32+
ctx,
33+
prompt: ctx.prompt,
34+
stream: true,
35+
on_event: &Pipeline.Streaming.ConsoleHandler.handle_event/1
36+
)
37+
```
38+
39+
- Use `Pipeline.Streaming.Handler` behaviour for shared handlers.
40+
- Prefer function captures (`&Module.handle_event/1`) over anonymous functions so the handler remains testable.
41+
42+
## 4. YAML Workflow Changes
43+
44+
```yaml
45+
# BEFORE
46+
- id: draft_report
47+
type: llm
48+
provider: claude
49+
async_stream: true
50+
async_handler: Pipeline.AsyncStreaming.ConsoleHandler
51+
52+
# AFTER
53+
- id: draft_report
54+
type: llm
55+
provider: claude
56+
stream: true
57+
stream_handler: Pipeline.Streaming.ConsoleHandler
58+
```
59+
60+
Make sure the handler module is compiled and available when running `mix pipeline.run`.
61+
62+
## 5. Testing Strategy
63+
64+
- **Unit tests**: use the `Pipeline.TestMode.StreamingRecorder` helper to capture deltas without hitting real providers.
65+
- **Integration tests**: tag scenarios that depend on live streaming with `@tag :streaming` and execute via `mix test --only streaming`.
66+
- **Diagnostics**: set `PIPELINE_STREAM_DEBUG=1` to print raw events during runs.
67+
68+
## 6. Common Issues
69+
70+
- **Handler not invoked**: confirm the module implements `handle_event/1` and is referenced correctly in YAML.
71+
- **Duplicate output**: most often caused by reusing the same handler process; ensure each run starts a fresh handler.
72+
- **Fallback to sync execution**: some providers disable streaming for certain models; verify the chosen model supports it.
73+
74+
## 7. Further Reading
75+
76+
- `examples/STREAMING_GUIDE.md` for a quick-start implementation.
77+
- `docs/ASYNC_STREAMING_ASSESSMENT.md` for evaluation notes on the old system.
78+
- `docs/ASYNC_STREAMING_EVALUATION_REPORT.md` for design trade-offs and roadmap.

examples/STREAMING_GUIDE.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Streaming Guide
2+
3+
This guide walks through enabling streaming output in `pipeline_ex` pipelines.
4+
5+
## When to Use Streaming
6+
7+
- Long running prompts where you want partial progress updates.
8+
- Tooling scenarios that push incremental tokens to a UI.
9+
- Debugging runs that need to expose intermediate state as it is produced.
10+
11+
If your pipeline already emits small responses, synchronous execution is fine. Streaming makes the biggest difference when a step may take tens of seconds or more.
12+
13+
## Quick Start
14+
15+
1. Add the `stream` flag to the step that invokes your LLM provider.
16+
2. Supply a streaming handler to capture events in real time.
17+
18+
```elixir
19+
defmodule Pipeline.StreamingExample do
20+
use Pipeline.Step
21+
22+
def run(ctx) do
23+
Pipeline.LLM.invoke!(
24+
ctx,
25+
prompt: ctx.prompt,
26+
stream: true,
27+
on_event: &handle_event/1
28+
)
29+
end
30+
31+
defp handle_event({:delta, chunk}) do
32+
IO.write(chunk.content)
33+
end
34+
end
35+
```
36+
37+
Store handlers under `lib/pipeline/streaming/` so other pipelines can reuse them.
38+
39+
## YAML Configuration
40+
41+
Streaming can be toggled directly in pipeline YAML workflows.
42+
43+
```yaml
44+
steps:
45+
- id: research_summary
46+
type: llm
47+
provider: claude
48+
prompt: >
49+
Summarize the latest findings on autonomous research pipelines.
50+
stream: true
51+
stream_handler: Pipeline.Streaming.ConsoleHandler
52+
```
53+
54+
### Handler Requirements
55+
56+
- Accept events shaped as `{:delta, chunk}` and `{:done, response}`.
57+
- Avoid blocking calls; use async tasks if you need to buffer or persist output.
58+
- Keep handlers idempotent—retries can trigger duplicate events.
59+
60+
## Testing Streaming Pipelines
61+
62+
- Run `mix test test/streaming` to execute streaming regression suites.
63+
- For ad-hoc verification, execute `mix pipeline.run pipelines/demo.yaml --stream`.
64+
- Capture output with `tee` if you need to diff streamed tokens: `mix pipeline.run ... | tee log.txt`.
65+
66+
## Troubleshooting
67+
68+
- **Nothing appears on screen**: confirm your handler writes to stdout and that the provider supports streaming for the selected model.
69+
- **Out-of-order chunks**: accumulate within a GenServer before printing to guarantee ordering.
70+
- **State bleed between runs**: reset any ETS tables or Agent state in your handler's `init/1`.
71+
72+
For deeper design notes, continue with `docs/ASYNC_STREAMING_MIGRATION_GUIDE.md`.

mix.exs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Pipeline.MixProject do
22
use Mix.Project
33

4-
@version "0.1.0"
4+
@version "0.1.1"
55
@source_url "https://github.com/nshkrdotcom/pipeline_ex"
66

77
def project do
@@ -156,7 +156,9 @@ defmodule Pipeline.MixProject do
156156
"docs/specifications/data_processing_pipelines.md": [title: "Data Processing Pipelines"],
157157
"docs/specifications/model_development_pipelines.md": [
158158
title: "Model Development Pipelines"
159-
]
159+
],
160+
"examples/STREAMING_GUIDE.md": [title: "Streaming Guide"],
161+
"docs/ASYNC_STREAMING_MIGRATION_GUIDE.md": [title: "Async Streaming Migration"]
160162
],
161163
groups_for_modules: [
162164
Core: [Pipeline, Pipeline.Config, Pipeline.Executor],

0 commit comments

Comments
 (0)