Skip to content

Commit c0299ad

Browse files
committed
Add a general purpose collecting method to ReadStream to facilitate the reduction of streams. The collecting method is a default method.
1 parent 79c06b4 commit c0299ad

File tree

4 files changed

+110
-2
lines changed

4 files changed

+110
-2
lines changed

src/main/asciidoc/streams.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,15 @@ returns `true` if the write queue is considered full.
170170
Will be called if an exception occurs on the `WriteStream`.
171171
- {@link io.vertx.core.streams.WriteStream#drainHandler}:
172172
The handler will be called if the `WriteStream` is considered no longer full.
173+
174+
=== Reducing streams
175+
176+
Java collectors can reduce a `ReadStream` to a result in a similar fashion `java.util.Stream` does, yet in an asynchronous
177+
fashion.
178+
179+
[source,$lang]
180+
----
181+
{@link examples.StreamsExamples#reduce1}
182+
----
183+
184+
Note that `collect` overrides any previously handler set on the stream.

src/main/java/examples/StreamsExamples.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
package examples;
1313

14-
import io.vertx.core.Handler;
14+
import io.vertx.core.Future;
1515
import io.vertx.core.Vertx;
1616
import io.vertx.core.buffer.Buffer;
1717
import io.vertx.core.file.AsyncFile;
@@ -20,6 +20,9 @@
2020
import io.vertx.core.net.NetServer;
2121
import io.vertx.core.net.NetServerOptions;
2222
import io.vertx.core.streams.Pipe;
23+
import io.vertx.core.streams.ReadStream;
24+
25+
import java.util.stream.Collectors;
2326

2427
/**
2528
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -164,4 +167,11 @@ public void pipe9(AsyncFile src, AsyncFile dst) {
164167
dst.end(Buffer.buffer("done"));
165168
});
166169
}
170+
171+
public <T> void reduce1(ReadStream<T> stream) {
172+
// Count the number of elements
173+
Future<Long> result = stream.collect(Collectors.counting());
174+
175+
result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements"));
176+
}
167177
}

src/main/java/io/vertx/core/streams/ReadStream.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
package io.vertx.core.streams;
1313

1414
import io.vertx.codegen.annotations.Fluent;
15+
import io.vertx.codegen.annotations.GenIgnore;
1516
import io.vertx.codegen.annotations.Nullable;
1617
import io.vertx.codegen.annotations.VertxGen;
17-
import io.vertx.core.AsyncResult;
1818
import io.vertx.core.Future;
1919
import io.vertx.core.Handler;
2020
import io.vertx.core.Promise;
21+
import io.vertx.core.impl.future.PromiseInternal;
2122
import io.vertx.core.streams.impl.PipeImpl;
2223

24+
import java.util.function.BiConsumer;
25+
2326
/**
2427
* Represents a stream of items that can be read from.
2528
* <p>
@@ -111,6 +114,27 @@ default Pipe<T> pipe() {
111114
return new PipeImpl<>(this);
112115
}
113116

117+
/**
118+
* Apply a {@code collector} to this stream, the obtained result is returned as a future.
119+
* <p/>
120+
* Handlers of this stream are affected by this operation.
121+
*
122+
* @return a future notified with result produced by the {@code collector} applied to this stream
123+
*/
124+
@GenIgnore(GenIgnore.PERMITTED_TYPE)
125+
default <R, A> Future<R> collect(java.util.stream.Collector<T , A , R> collector) {
126+
PromiseInternal<R> promise = (PromiseInternal<R>) Promise.promise();
127+
A cumulation = collector.supplier().get();
128+
BiConsumer<A, T> accumulator = collector.accumulator();
129+
handler(elt -> accumulator.accept(cumulation, elt));
130+
endHandler(v -> {
131+
R result = collector.finisher().apply(cumulation);
132+
promise.tryComplete(result);
133+
});
134+
exceptionHandler(promise::tryFail);
135+
return promise.future();
136+
}
137+
114138
/**
115139
* Pipe this {@code ReadStream} to the {@code WriteStream}.
116140
* <p>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.streams;
12+
13+
import io.vertx.core.Future;
14+
import io.vertx.test.core.AsyncTestBase;
15+
import io.vertx.test.fakestream.FakeStream;
16+
import org.junit.Test;
17+
18+
import java.util.Arrays;
19+
import java.util.List;
20+
import java.util.stream.Collectors;
21+
22+
public class ReadStreamReduceTest extends AsyncTestBase {
23+
24+
private FakeStream<Object> dst;
25+
private Object o1 = new Object();
26+
private Object o2 = new Object();
27+
private Object o3 = new Object();
28+
29+
@Override
30+
protected void setUp() throws Exception {
31+
super.setUp();
32+
dst = new FakeStream<>();
33+
}
34+
35+
@Test
36+
public void testCollect() {
37+
Future<List<Object>> list = dst.collect(Collectors.toList());
38+
assertFalse(list.isComplete());
39+
dst.write(o1);
40+
assertFalse(list.isComplete());
41+
dst.write(o2);
42+
assertFalse(list.isComplete());
43+
dst.write(o3);
44+
dst.end();
45+
assertTrue(list.succeeded());
46+
assertEquals(Arrays.asList(o1, o2, o3), list.result());
47+
}
48+
49+
@Test
50+
public void testFailure() {
51+
Future<List<Object>> list = dst.collect(Collectors.toList());
52+
assertFalse(list.isComplete());
53+
dst.write(o1);
54+
assertFalse(list.isComplete());
55+
dst.write(o2);
56+
assertFalse(list.isComplete());
57+
Throwable err = new Throwable();
58+
dst.fail(err);
59+
assertTrue(list.failed());
60+
assertSame(err, list.cause());
61+
}
62+
}

0 commit comments

Comments
 (0)