Skip to content

Commit 67fe23e

Browse files
committed
make StreamerV2 itself an io.Reader
1 parent aa1d9b0 commit 67fe23e

File tree

2 files changed

+32
-45
lines changed

2 files changed

+32
-45
lines changed

stream_v2.go

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -49,47 +49,6 @@ type DeltaImageURL struct {
4949
Detail string `json:"detail"`
5050
}
5151

52-
// streamTextReader is an io.Reader of the text deltas of thread.message.delta events
53-
type streamTextReader struct {
54-
streamer *StreamerV2
55-
buffer []byte
56-
}
57-
58-
func newStreamTextReader(streamer *StreamerV2) io.Reader {
59-
return &streamTextReader{
60-
streamer: streamer,
61-
}
62-
}
63-
64-
func (r *streamTextReader) Read(p []byte) (int, error) {
65-
// If we have data in the buffer, copy it to p first.
66-
if len(r.buffer) > 0 {
67-
n := copy(p, r.buffer)
68-
r.buffer = r.buffer[n:]
69-
return n, nil
70-
}
71-
72-
for r.streamer.Next() {
73-
// Read only text deltas
74-
text, ok := r.streamer.MessageDeltaText()
75-
if !ok {
76-
continue
77-
}
78-
79-
r.buffer = []byte(text)
80-
n := copy(p, r.buffer)
81-
r.buffer = r.buffer[n:]
82-
return n, nil
83-
}
84-
85-
// Check for streamer error
86-
if err := r.streamer.Err(); err != nil {
87-
return 0, err
88-
}
89-
90-
return 0, io.EOF
91-
}
92-
9352
func NewStreamerV2(r io.Reader) *StreamerV2 {
9453
var rc io.ReadCloser
9554

@@ -111,6 +70,9 @@ type StreamerV2 struct {
11170

11271
scanner *SSEScanner
11372
next any
73+
74+
// buffer for implementing io.Reader
75+
buffer []byte
11476
}
11577

11678
// Close closes the underlying io.ReadCloser
@@ -144,9 +106,34 @@ func (s *StreamerV2) Next() bool {
144106
return true
145107
}
146108

147-
// Reader returns io.Reader of the text deltas of thread.message.delta events
148-
func (s *StreamerV2) Reader() io.Reader {
149-
return newStreamTextReader(s)
109+
// Read implements io.Reader of the text deltas of thread.message.delta events
110+
func (r *StreamerV2) Read(p []byte) (int, error) {
111+
// If we have data in the buffer, copy it to p first.
112+
if len(r.buffer) > 0 {
113+
n := copy(p, r.buffer)
114+
r.buffer = r.buffer[n:]
115+
return n, nil
116+
}
117+
118+
for r.Next() {
119+
// Read only text deltas
120+
text, ok := r.MessageDeltaText()
121+
if !ok {
122+
continue
123+
}
124+
125+
r.buffer = []byte(text)
126+
n := copy(p, r.buffer)
127+
r.buffer = r.buffer[n:]
128+
return n, nil
129+
}
130+
131+
// Check for streamer error
132+
if err := r.Err(); err != nil {
133+
return 0, err
134+
}
135+
136+
return 0, io.EOF
150137
}
151138

152139
func (s *StreamerV2) Event() any {

stream_v2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ data: {"id":"msg_KFiZxHhXYQo6cGFnGjRDHSee","object":"thread.message.delta","delt
1919
event: done
2020
data: [DONE]
2121
`
22-
reader := NewStreamerV2(strings.NewReader(raw)).Reader()
22+
reader := NewStreamerV2(strings.NewReader(raw))
2323

2424
expected := "helloworld"
2525
buffer := make([]byte, len(expected))

0 commit comments

Comments
 (0)