Skip to content

Commit 0e7dd22

Browse files
Add method on FS2Channel: resource (#80)
* Add method on FS2Channel: resources * Rewrite old stream variant in terms of resource --------- Co-authored-by: ghostbuster91 <ghostbuster91@users.noreply.github.com>
1 parent 703e25a commit 0e7dd22

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ trait FS2Channel[F[_]] extends Channel[F] {
4747

4848
object FS2Channel {
4949

50-
def apply[F[_]: Concurrent](
50+
def resource[F[_]: Concurrent](
5151
bufferSize: Int = 2048,
5252
cancelTemplate: Option[CancelTemplate] = None
53-
): Stream[F, FS2Channel[F]] = {
53+
): Resource[F, FS2Channel[F]] = {
5454
for {
55-
supervisor <- Stream.resource(Supervisor[F])
56-
ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, Vector.empty, 0)).toStream
57-
queue <- cats.effect.std.Queue.bounded[F, Message](bufferSize).toStream
55+
supervisor <- Supervisor[F]
56+
ref <- Resource.eval(Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, Vector.empty, 0)))
57+
queue <- Resource.eval(cats.effect.std.Queue.bounded[F, Message](bufferSize))
5858
impl = new Impl(queue, ref, supervisor, cancelTemplate)
5959

6060
// Creating a bespoke endpoint to receive cancelation requests
@@ -66,10 +66,15 @@ object FS2Channel {
6666
}
6767
}
6868
// mounting the cancelation endpoint
69-
_ <- maybeCancelEndpoint.traverse_(ep => impl.mountEndpoint(ep)).toStream
69+
_ <- Resource.eval(maybeCancelEndpoint.traverse_(ep => impl.mountEndpoint(ep)))
7070
} yield impl
7171
}
7272

73+
def apply[F[_]: Concurrent](
74+
bufferSize: Int = 2048,
75+
cancelTemplate: Option[CancelTemplate] = None
76+
): Stream[F, FS2Channel[F]] = Stream.resource(resource(bufferSize, cancelTemplate))
77+
7378
private case class State[F[_]](
7479
runningCalls: Map[CallId, Fiber[F, Throwable, Unit]],
7580
pendingCalls: Map[CallId, OutputMessage => F[Unit]],

0 commit comments

Comments
 (0)