Skip to content

Commit ae33b80

Browse files
authored
Close gzip writer before reading stream (#183)
Signed-off-by: Hai Yan <oeyh@amazon.com> Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 3405add commit ae33b80

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

lib/logstash/outputs/opensearch/http_client.rb

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ def bulk(actions)
107107

108108
body_stream = StringIO.new
109109
if http_compression
110-
body_stream.set_encoding "BINARY"
111110
stream_writer = gzip_writer(body_stream)
112111
else
113112
stream_writer = body_stream
@@ -126,9 +125,24 @@ def bulk(actions)
126125
:payload_size => stream_writer.pos,
127126
:content_length => body_stream.size,
128127
:batch_offset => (index + 1 - batch_actions.size))
128+
129+
# Have to close gzip writer before reading from body_stream; otherwise stream doesn't end properly
130+
# and will cause server side error
131+
if http_compression
132+
stream_writer.close
133+
end
134+
129135
bulk_responses << bulk_send(body_stream, batch_actions)
130-
body_stream.truncate(0) && body_stream.seek(0)
131-
stream_writer = gzip_writer(body_stream) if http_compression
136+
137+
if http_compression
138+
# Get a new StringIO object and gzip writer
139+
body_stream = StringIO.new
140+
stream_writer = gzip_writer(body_stream)
141+
else
142+
# Clear existing StringIO object and reuse existing stream writer
143+
body_stream.truncate(0) && body_stream.seek(0)
144+
end
145+
132146
batch_actions.clear
133147
end
134148
stream_writer.write(as_json)
@@ -149,6 +163,7 @@ def gzip_writer(io)
149163
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
150164
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0
151165

166+
io.set_encoding "BINARY"
152167
Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
153168
end
154169

0 commit comments

Comments
 (0)