Skip to content

Commit c2be20b

Browse files
authored
Merge pull request #589 from ingemar/ingemar-pass-stream-event-to-user_proc
Refactor stream handling to expose event labels
2 parents 2052e40 + 6b4ff78 commit c2be20b

File tree

9 files changed

+188
-112
lines changed

9 files changed

+188
-112
lines changed

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ client.chat(
282282
model: "deepseek-chat", # Required.
283283
messages: [{ role: "user", content: "Hello!"}], # Required.
284284
temperature: 0.7,
285-
stream: proc do |chunk, _bytesize|
285+
stream: proc do |chunk, _event|
286286
print chunk.dig("choices", 0, "delta", "content")
287287
end
288288
}
@@ -313,7 +313,7 @@ client.chat(
313313
model: "llama3", # Required.
314314
messages: [{ role: "user", content: "Hello!"}], # Required.
315315
temperature: 0.7,
316-
stream: proc do |chunk, _bytesize|
316+
stream: proc do |chunk, _event|
317317
print chunk.dig("choices", 0, "delta", "content")
318318
end
319319
}
@@ -337,7 +337,7 @@ client.chat(
337337
model: "llama3-8b-8192", # Required.
338338
messages: [{ role: "user", content: "Hello!"}], # Required.
339339
temperature: 0.7,
340-
stream: proc do |chunk, _bytesize|
340+
stream: proc do |chunk, _event|
341341
print chunk.dig("choices", 0, "delta", "content")
342342
end
343343
}
@@ -423,7 +423,7 @@ client.chat(
423423
model: "gpt-4o", # Required.
424424
messages: [{ role: "user", content: "Describe a character called Anna!"}], # Required.
425425
temperature: 0.7,
426-
stream: proc do |chunk, _bytesize|
426+
stream: proc do |chunk, _event|
427427
print chunk.dig("choices", 0, "delta", "content")
428428
end
429429
}
@@ -509,7 +509,7 @@ You can stream it as well!
509509
model: "gpt-4o",
510510
messages: [{ role: "user", content: "Can I have some JSON please?"}],
511511
response_format: { type: "json_object" },
512-
stream: proc do |chunk, _bytesize|
512+
stream: proc do |chunk, _event|
513513
print chunk.dig("choices", 0, "delta", "content")
514514
end
515515
}
@@ -594,7 +594,7 @@ client.responses.create(
594594
parameters: {
595595
model: "gpt-4o", # Required.
596596
input: "Hello!", # Required.
597-
stream: proc do |chunk, _bytesize|
597+
stream: proc do |chunk, _event|
598598
if chunk["type"] == "response.output_text.delta"
599599
print chunk["delta"]
600600
$stdout.flush # Ensure output is displayed immediately
@@ -1216,7 +1216,7 @@ client.runs.create(
12161216
assistant_id: assistant_id,
12171217
max_prompt_tokens: 256,
12181218
max_completion_tokens: 16,
1219-
stream: proc do |chunk, _bytesize|
1219+
stream: proc do |chunk, _event|
12201220
if chunk["object"] == "thread.message.delta"
12211221
print chunk.dig("delta", "content", 0, "text", "value")
12221222
end

lib/openai.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require_relative "openai/realtime"
1414
require_relative "openai/runs"
1515
require_relative "openai/run_steps"
16+
require_relative "openai/stream"
1617
require_relative "openai/vector_stores"
1718
require_relative "openai/vector_store_files"
1819
require_relative "openai/vector_store_file_batches"

lib/openai/http.rb

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,27 +55,6 @@ def parse_json(response)
5555
original_response
5656
end
5757

58-
# Given a proc, returns an outer proc that can be used to iterate over a JSON stream of chunks.
59-
# For each chunk, the inner user_proc is called giving it the JSON object. The JSON object could
60-
# be a data object or an error object as described in the OpenAI API documentation.
61-
#
62-
# @param user_proc [Proc] The inner proc to call for each JSON object in the chunk.
63-
# @return [Proc] An outer proc that iterates over a raw stream, converting it to JSON.
64-
def to_json_stream(user_proc:)
65-
parser = EventStreamParser::Parser.new
66-
67-
proc do |chunk, _bytes, env|
68-
if env && env.status != 200
69-
raise_error = Faraday::Response::RaiseError.new
70-
raise_error.on_complete(env.merge(body: try_parse_json(chunk)))
71-
end
72-
73-
parser.feed(chunk) do |_type, data|
74-
user_proc.call(JSON.parse(data)) unless data == "[DONE]"
75-
end
76-
end
77-
end
78-
7958
def conn(multipart: false)
8059
connection = Faraday.new do |f|
8160
f.options[:timeout] = @request_timeout
@@ -120,7 +99,7 @@ def configure_json_post_request(req, parameters)
12099
req_parameters = parameters.dup
121100

122101
if parameters[:stream].respond_to?(:call)
123-
req.options.on_data = to_json_stream(user_proc: parameters[:stream])
102+
req.options.on_data = Stream.new(user_proc: parameters[:stream]).to_proc
124103
req_parameters[:stream] = true # Necessary to tell OpenAI to stream.
125104
elsif parameters[:stream]
126105
raise ArgumentError, "The stream parameter must be a Proc or have a #call method"
@@ -129,11 +108,5 @@ def configure_json_post_request(req, parameters)
129108
req.headers = headers
130109
req.body = req_parameters.to_json
131110
end
132-
133-
def try_parse_json(maybe_json)
134-
JSON.parse(maybe_json)
135-
rescue JSON::ParserError
136-
maybe_json
137-
end
138111
end
139112
end

lib/openai/stream.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
module OpenAI
2+
class Stream
3+
DONE = "[DONE]".freeze
4+
private_constant :DONE
5+
6+
def initialize(user_proc:, parser: EventStreamParser::Parser.new)
7+
@user_proc = user_proc
8+
@parser = parser
9+
10+
# To be backwards compatible, we need to check how many arguments the user_proc takes.
11+
@user_proc_arity =
12+
case user_proc
13+
when Proc
14+
user_proc.arity.abs
15+
else
16+
user_proc.method(:call).arity.abs
17+
end
18+
end
19+
20+
def call(chunk, _bytes, env)
21+
handle_http_error(chunk: chunk, env: env) if env && env.status != 200
22+
23+
parser.feed(chunk) do |event, data|
24+
next if data == DONE
25+
26+
args = [JSON.parse(data), event].first(user_proc_arity)
27+
user_proc.call(*args)
28+
end
29+
end
30+
31+
def to_proc
32+
method(:call).to_proc
33+
end
34+
35+
private
36+
37+
attr_reader :user_proc, :parser, :user_proc_arity
38+
39+
def handle_http_error(chunk:, env:)
40+
raise_error = Faraday::Response::RaiseError.new
41+
raise_error.on_complete(env.merge(body: try_parse_json(chunk)))
42+
end
43+
44+
def try_parse_json(maybe_json)
45+
JSON.parse(maybe_json)
46+
rescue JSON::ParserError
47+
maybe_json
48+
end
49+
end
50+
end

spec/openai/client/chat_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
describe "streaming" do
8383
let(:chunks) { [] }
8484
let(:stream) do
85-
proc do |chunk, _bytesize|
85+
proc do |chunk, _event|
8686
chunks << chunk
8787
end
8888
end
@@ -196,7 +196,7 @@ def call(chunk)
196196
end
197197
let(:chunks) { [] }
198198
let(:stream) do
199-
proc do |chunk, _bytesize|
199+
proc do |chunk, _event|
200200
chunks << chunk
201201
end
202202
end
@@ -224,7 +224,7 @@ def call(chunk)
224224
end
225225
let(:chunks) { [] }
226226
let(:stream) do
227-
proc do |chunk, _bytesize|
227+
proc do |chunk, _event|
228228
chunks << chunk
229229
end
230230
end

spec/openai/client/http_spec.rb

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
context "streaming" do
5656
let(:chunks) { [] }
5757
let(:stream) do
58-
proc do |chunk, _bytesize|
58+
proc do |chunk, _event|
5959
chunks << chunk
6060
end
6161
end
@@ -120,75 +120,6 @@
120120
end
121121
end
122122

123-
describe ".to_json_stream" do
124-
context "with a proc" do
125-
let(:user_proc) { proc { |x| x } }
126-
let(:stream) { OpenAI::Client.new.send(:to_json_stream, user_proc: user_proc) }
127-
128-
it "returns a proc" do
129-
expect(stream).to be_a(Proc)
130-
end
131-
132-
context "when called with a string containing a single JSON object" do
133-
it "calls the user proc with the data parsed as JSON" do
134-
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
135-
stream.call(<<~CHUNK)
136-
data: { "foo": "bar" }
137-
138-
#
139-
CHUNK
140-
end
141-
end
142-
143-
context "when called with a string containing more than one JSON object" do
144-
it "calls the user proc for each data parsed as JSON" do
145-
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
146-
expect(user_proc).to receive(:call).with(JSON.parse('{"baz": "qud"}'))
147-
148-
stream.call(<<~CHUNK)
149-
data: { "foo": "bar" }
150-
151-
data: { "baz": "qud" }
152-
153-
data: [DONE]
154-
155-
#
156-
CHUNK
157-
end
158-
end
159-
160-
context "when called with string containing invalid JSON" do
161-
let(:chunk) do
162-
<<~CHUNK
163-
data: { "foo": "bar" }
164-
165-
data: NOT JSON
166-
167-
#
168-
CHUNK
169-
end
170-
171-
it "raise an error" do
172-
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
173-
174-
expect do
175-
stream.call(chunk)
176-
end.to raise_error(JSON::ParserError)
177-
end
178-
end
179-
180-
context "when called with JSON split across chunks" do
181-
it "calls the user proc with the data parsed as JSON" do
182-
expect(user_proc).to receive(:call).with(JSON.parse('{ "foo": "bar" }'))
183-
expect do
184-
stream.call("data: { \"foo\":")
185-
stream.call(" \"bar\" }\n\n")
186-
end.not_to raise_error
187-
end
188-
end
189-
end
190-
end
191-
192123
describe ".parse_json" do
193124
context "with a jsonl string" do
194125
let(:body) { "{\"prompt\":\":)\"}\n{\"prompt\":\":(\"}\n" }

spec/openai/client/responses_spec.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
describe "streaming" do
9292
let(:chunks) { [] }
9393
let(:stream) do
94-
proc do |chunk, _bytesize|
94+
proc do |chunk, _event|
9595
chunks << chunk
9696
end
9797
end
@@ -112,13 +112,15 @@
112112
let(:cassette) { "responses stream without proc" }
113113
let(:stream) do
114114
Class.new do
115-
attr_reader :chunks
115+
attr_reader :chunks, :events
116116

117117
def initialize
118118
@chunks = []
119+
@events = []
119120
end
120121

121-
def call(chunk)
122+
def call(chunk, event)
123+
@events << event
122124
@chunks << chunk
123125
end
124126
end.new
@@ -132,6 +134,8 @@ def call(chunk)
132134
.map { |chunk| chunk["delta"] }
133135
.join
134136
expect(output_text).to include("?")
137+
expect(stream.events.first).to eq("response.created")
138+
expect(stream.events.last).to eq("response.completed")
135139
end
136140
end
137141
end

spec/openai/client/runs_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
describe "streaming" do
8686
let(:chunks) { [] }
8787
let(:stream) do
88-
proc do |chunk, _bytesize|
88+
proc do |chunk, _event|
8989
chunks << chunk
9090
end
9191
end

0 commit comments

Comments
 (0)