1616
1717package org .cloudfoundry .reactor .doppler ;
1818
19+ import org .slf4j .Logger ;
20+ import org .slf4j .LoggerFactory ;
21+
1922import io .netty .buffer .Unpooled ;
2023import io .netty .handler .codec .DelimiterBasedFrameDecoder ;
2124import io .netty .handler .codec .http .HttpHeaderNames ;
2225import reactor .core .publisher .Flux ;
2326import reactor .netty .ByteBufFlux ;
2427import reactor .netty .http .client .HttpClientResponse ;
2528
29+ import java .io .IOException ;
2630import java .io .InputStream ;
2731import java .nio .charset .Charset ;
2832import java .util .regex .Matcher ;
2933import java .util .regex .Pattern ;
3034
3135final class MultipartCodec {
3236
37+ private static final Logger LOGGER = LoggerFactory .getLogger (MultipartCodec .class );
38+
3339 private static final Pattern BOUNDARY_PATTERN = Pattern .compile ("multipart/.+; boundary=(.*)" );
3440
3541 private static final int MAX_PAYLOAD_SIZE = 1024 * 1024 ;
@@ -49,7 +55,8 @@ static DelimiterBasedFrameDecoder createDecoder(HttpClientResponse response) {
4955
5056 static Flux <InputStream > decode (ByteBufFlux body ) {
5157 return body .asInputStream ()
52- .skip (1 );
58+ .skip (1 )
59+ .doOnDiscard (InputStream .class , MultipartCodec ::close );
5360 }
5461
5562 private static String extractMultipartBoundary (HttpClientResponse response ) {
@@ -63,4 +70,12 @@ private static String extractMultipartBoundary(HttpClientResponse response) {
6370 }
6471 }
6572
73+ private static void close (InputStream in ) {
74+ try {
75+ in .close ();
76+ } catch (IOException e ) {
77+ LOGGER .warn ("Could not close input stream. This will cause a direct memory leak." , e );
78+ }
79+ }
80+
6681}
0 commit comments