Skip to content

Commit 1ca96ee

Browse files
Merge pull request #82 from neandertech/smithy-simplify
Simplify Smithy integration
2 parents e45acf7 + cb40262 commit 1ca96ee

File tree

13 files changed

+53
-73
lines changed

13 files changed

+53
-73
lines changed

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ val smithy4s = projectMatrix
121121
.nativePlatform(Seq(scala3))
122122
.disablePlugins(AssemblyPlugin)
123123
.enablePlugins(Smithy4sCodegenPlugin)
124-
.dependsOn(fs2)
124+
.dependsOn(core)
125125
.settings(
126126
name := "jsonrpclib-smithy4s",
127127
commonSettings,
@@ -169,7 +169,7 @@ val exampleClient = projectMatrix
169169
val exampleSmithyShared = projectMatrix
170170
.in(file("modules") / "examples/smithyShared")
171171
.jvmPlatform(List(scala213), commonJvmSettings)
172-
.dependsOn(smithy4s)
172+
.dependsOn(smithy4s, fs2)
173173
.enablePlugins(Smithy4sCodegenPlugin)
174174
.settings(
175175
commonSettings,

modules/core/src/main/scala/jsonrpclib/Monadic.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ trait Monadic[F[_]] {
88
def doPure[A](a: A): F[A]
99
def doAttempt[A](fa: F[A]): F[Either[Throwable, A]]
1010
def doRaiseError[A](e: Throwable): F[A]
11+
def doMap[A, B](fa: F[A])(f: A => B): F[B] = doFlatMap(fa)(a => doPure(f(a)))
12+
def doVoid[A](fa: F[A]): F[Unit] = doMap(fa)(_ => ())
1113
}
1214

1315
object Monadic {
16+
def apply[F[_]](implicit F: Monadic[F]): Monadic[F] = F
17+
1418
implicit def monadicFuture(implicit ec: ExecutionContext): Monadic[Future] = new Monadic[Future] {
1519
def doFlatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
1620

@@ -19,5 +23,7 @@ object Monadic {
1923
def doAttempt[A](fa: Future[A]): Future[Either[Throwable, A]] = fa.map(Right(_)).recover(Left(_))
2024

2125
def doRaiseError[A](e: Throwable): Future[A] = Future.failed(e)
26+
27+
override def doMap[A, B](fa: Future[A])(f: A => B): Future[B] = fa.map(f)
2228
}
2329
}

modules/examples/client/src/main/scala/examples/client/ClientMain.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ object ClientMain extends IOApp.Simple {
3232
// Starting the server
3333
rp <- fs2.Stream.resource(Processes[IO].spawn(process.ProcessBuilder("java", "-jar", serverJar)))
3434
// Creating a channel that will be used to communicate to the server
35-
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
35+
fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some)
3636
_ <- Stream(())
3737
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
3838
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))

modules/examples/server/src/main/scala/examples/server/ServerMain.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ object ServerMain extends IOApp.Simple {
2828
def run: IO[Unit] = {
2929
// Using errorln as stdout is used by the RPC channel
3030
IO.consoleForIO.errorln("Starting server") >>
31-
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
31+
FS2Channel
32+
.stream[IO](cancelTemplate = Some(cancelEndpoint))
3233
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
3334
.flatMap(channel =>
3435
fs2.Stream

modules/examples/smithyClient/src/main/scala/examples/smithy/client/ClientMain.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ object SmithyClientMain extends IOApp.Simple {
3232
// Starting the server
3333
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
3434
// Creating a channel that will be used to communicate to the server
35-
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
35+
fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some)
3636
// Mounting our implementation of the generated interface onto the channel
3737
_ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client))
3838
// Creating stubs to talk to the remote server
39-
server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel)
39+
server: TestServer[IO] = ClientStub(test.TestServer, fs2Channel)
4040
_ <- Stream(())
4141
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
4242
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))

modules/examples/smithyServer/src/main/scala/examples/smithy/server/ServerMain.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@ object ServerMain extends IOApp.Simple {
2424

2525
def run: IO[Unit] = {
2626
val run =
27-
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
27+
FS2Channel
28+
.stream[IO](cancelTemplate = Some(cancelEndpoint))
2829
.flatMap { channel =>
29-
ClientStub
30-
.stream(TestClient, channel)
31-
.flatMap { testClient =>
32-
channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
33-
}
30+
val testClient = ClientStub(TestClient, channel)
31+
channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
3432
}
3533
.flatMap { channel =>
3634
fs2.Stream

modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,15 @@ object FS2Channel {
7070
} yield impl
7171
}
7272

73+
@deprecated("use stream or resource", "0.0.9")
7374
def apply[F[_]: Concurrent](
7475
bufferSize: Int = 2048,
7576
cancelTemplate: Option[CancelTemplate] = None
77+
): Stream[F, FS2Channel[F]] = stream(bufferSize, cancelTemplate)
78+
79+
def stream[F[_]: Concurrent](
80+
bufferSize: Int = 2048,
81+
cancelTemplate: Option[CancelTemplate] = None
7682
): Stream[F, FS2Channel[F]] = Stream.resource(resource(bufferSize, cancelTemplate))
7783

7884
private case class State[F[_]](

modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ package object fs2 {
2424
def doAttempt[A](fa: F[A]): F[Either[Throwable, A]] = MonadThrow[F].attempt(fa)
2525

2626
def doRaiseError[A](e: Throwable): F[A] = MonadThrow[F].raiseError(e)
27+
28+
override def doMap[A, B](fa: F[A])(f: A => B): F[B] = Monad[F].map(fa)(f)
29+
30+
override def doVoid[A](fa: F[A]): F[Unit] = Monad[F].void(fa)
2731
}
2832

2933
}

modules/fs2/src/test/scala/jsonrpclib/fs2/FS2ChannelSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ object FS2ChannelSpec extends SimpleIOSuite {
3030
def setup(cancelTemplate: CancelTemplate, endpoints: Endpoint[IO]*) = setupAux(endpoints, Some(cancelTemplate))
3131
def setupAux(endpoints: Seq[Endpoint[IO]], cancelTemplate: Option[CancelTemplate]): Stream[IO, ClientSideChannel] = {
3232
for {
33-
serverSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
34-
clientSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
33+
serverSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate)
34+
clientSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate)
3535
_ <- serverSideChannel.withEndpointsStream(endpoints)
3636
_ <- Stream(())
3737
.concurrently(clientSideChannel.output.through(serverSideChannel.input))

modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ClientStub.scala

Lines changed: 17 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,34 @@
11
package jsonrpclib.smithy4sinterop
22

33
import smithy4s.~>
4-
import cats.MonadThrow
5-
import jsonrpclib.fs2._
64
import smithy4s.Service
75
import smithy4s.schema._
8-
import cats.effect.kernel.Async
9-
import smithy4s.kinds.PolyFunction5
106
import smithy4s.ShapeId
11-
import cats.syntax.all._
127
import smithy4s.json.Json
138
import jsonrpclib.Codec._
149
import com.github.plokhotnyuk.jsoniter_scala.core._
10+
import jsonrpclib.Channel
11+
import jsonrpclib.Monadic
1512

1613
object ClientStub {
1714

18-
def apply[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit
19-
F: Async[F]
20-
): F[service.Impl[F]] = new ClientStub(service, channel).compile
21-
22-
def stream[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit
23-
F: Async[F]
24-
): fs2.Stream[F, service.Impl[F]] = fs2.Stream.eval(new ClientStub(service, channel).compile)
15+
def apply[Alg[_[_, _, _, _, _]], F[_]: Monadic](service: Service[Alg], channel: Channel[F]): service.Impl[F] =
16+
new ClientStub(service, channel).compile
2517
}
2618

27-
private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], channel: FS2Channel[F])(implicit
28-
F: Async[F]
29-
) {
30-
31-
def compile: F[service.Impl[F]] = precompileAll.map { stubCache =>
32-
val interpreter = new service.FunctorInterpreter[F] {
33-
def apply[I, E, O, SI, SO](op: service.Operation[I, E, O, SI, SO]): F[O] = {
34-
val smithy4sEndpoint = service.endpoint(op)
35-
val input = service.input(op)
36-
(stubCache(smithy4sEndpoint): F[I => F[O]]).flatMap { stub =>
37-
stub(input)
38-
}
19+
private class ClientStub[Alg[_[_, _, _, _, _]], F[_]: Monadic](val service: Service[Alg], channel: Channel[F]) {
20+
21+
def compile: service.Impl[F] = {
22+
val interpreter = new service.FunctorEndpointCompiler[F] {
23+
def apply[I, E, O, SI, SO](e: service.Endpoint[I, E, O, SI, SO]): I => F[O] = {
24+
val shapeId = e.id
25+
val spec = EndpointSpec.fromHints(e.hints).toRight(NotJsonRPCEndpoint(shapeId)).toTry.get
26+
27+
jsonRPCStub(e, spec)
3928
}
4029
}
41-
service.fromPolyFunction(interpreter)
42-
}
4330

44-
private type Stub[I, E, O, SI, SO] = F[I => F[O]]
45-
private val precompileAll: F[PolyFunction5[service.Endpoint, Stub]] = {
46-
F.ref(Map.empty[ShapeId, Any]).flatMap { cache =>
47-
service.endpoints.toList
48-
.traverse_ { ep =>
49-
val shapeId = ep.id
50-
EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).flatMap { epSpec =>
51-
val stub = jsonRPCStub(ep, epSpec)
52-
cache.update(_ + (shapeId -> stub))
53-
}
54-
}
55-
.as {
56-
new PolyFunction5[service.Endpoint, Stub] {
57-
def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = {
58-
cache.get.map { c =>
59-
c(ep.id).asInstanceOf[I => F[O]]
60-
}
61-
}
62-
}
63-
}
64-
}
31+
service.impl(interpreter)
6532
}
6633

6734
private val jsoniterCodecGlobalCache = Json.jsoniter.createCache()
@@ -80,7 +47,7 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg],
8047
endpointSpec match {
8148
case EndpointSpec.Notification(methodName) =>
8249
val coerce = coerceUnit[O](smithy4sEndpoint.output)
83-
channel.notificationStub[I](methodName).andThen(f => f *> coerce)
50+
channel.notificationStub[I](methodName).andThen(f => Monadic[F].doFlatMap(f)(_ => coerce))
8451
case EndpointSpec.Request(methodName) =>
8552
channel.simpleStub[I, O](methodName)
8653
}
@@ -92,8 +59,8 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg],
9259
private object CoerceUnitVisitor extends (Schema ~> F) {
9360
def apply[A](schema: Schema[A]): F[A] = schema match {
9461
case s @ Schema.StructSchema(_, _, _, make) if s.isUnit =>
95-
MonadThrow[F].unit.asInstanceOf[F[A]]
96-
case _ => MonadThrow[F].raiseError[A](NotUnitReturnType)
62+
Monadic[F].doPure(()).asInstanceOf[F[A]]
63+
case _ => Monadic[F].doRaiseError[A](NotUnitReturnType)
9764
}
9865
}
9966

0 commit comments

Comments
 (0)