diff --git a/build.sbt b/build.sbt index 65108b7..e8d995d 100644 --- a/build.sbt +++ b/build.sbt @@ -121,7 +121,7 @@ val smithy4s = projectMatrix .nativePlatform(Seq(scala3)) .disablePlugins(AssemblyPlugin) .enablePlugins(Smithy4sCodegenPlugin) - .dependsOn(fs2) + .dependsOn(core) .settings( name := "jsonrpclib-smithy4s", commonSettings, @@ -169,7 +169,7 @@ val exampleClient = projectMatrix val exampleSmithyShared = projectMatrix .in(file("modules") / "examples/smithyShared") .jvmPlatform(List(scala213), commonJvmSettings) - .dependsOn(smithy4s) + .dependsOn(smithy4s, fs2) .enablePlugins(Smithy4sCodegenPlugin) .settings( commonSettings, diff --git a/modules/core/src/main/scala/jsonrpclib/Monadic.scala b/modules/core/src/main/scala/jsonrpclib/Monadic.scala index 5168dd5..0d5a7f0 100644 --- a/modules/core/src/main/scala/jsonrpclib/Monadic.scala +++ b/modules/core/src/main/scala/jsonrpclib/Monadic.scala @@ -8,9 +8,13 @@ trait Monadic[F[_]] { def doPure[A](a: A): F[A] def doAttempt[A](fa: F[A]): F[Either[Throwable, A]] def doRaiseError[A](e: Throwable): F[A] + def doMap[A, B](fa: F[A])(f: A => B): F[B] = doFlatMap(fa)(a => doPure(f(a))) + def doVoid[A](fa: F[A]): F[Unit] = doMap(fa)(_ => ()) } object Monadic { + def apply[F[_]](implicit F: Monadic[F]): Monadic[F] = F + implicit def monadicFuture(implicit ec: ExecutionContext): Monadic[Future] = new Monadic[Future] { def doFlatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) @@ -19,5 +23,7 @@ object Monadic { def doAttempt[A](fa: Future[A]): Future[Either[Throwable, A]] = fa.map(Right(_)).recover(Left(_)) def doRaiseError[A](e: Throwable): Future[A] = Future.failed(e) + + override def doMap[A, B](fa: Future[A])(f: A => B): Future[B] = fa.map(f) } } diff --git a/modules/examples/client/src/main/scala/examples/client/ClientMain.scala b/modules/examples/client/src/main/scala/examples/client/ClientMain.scala index 5097f2d..1173094 100644 --- a/modules/examples/client/src/main/scala/examples/client/ClientMain.scala +++ b/modules/examples/client/src/main/scala/examples/client/ClientMain.scala @@ -32,7 +32,7 @@ object ClientMain extends IOApp.Simple { // Starting the server rp <- fs2.Stream.resource(Processes[IO].spawn(process.ProcessBuilder("java", "-jar", serverJar))) // Creating a channel that will be used to communicate to the server - fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some) + fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some) _ <- Stream(()) .concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin)) .concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce)) diff --git a/modules/examples/server/src/main/scala/examples/server/ServerMain.scala b/modules/examples/server/src/main/scala/examples/server/ServerMain.scala index 72c9804..445274e 100644 --- a/modules/examples/server/src/main/scala/examples/server/ServerMain.scala +++ b/modules/examples/server/src/main/scala/examples/server/ServerMain.scala @@ -28,7 +28,8 @@ object ServerMain extends IOApp.Simple { def run: IO[Unit] = { // Using errorln as stdout is used by the RPC channel IO.consoleForIO.errorln("Starting server") >> - FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) + FS2Channel + .stream[IO](cancelTemplate = Some(cancelEndpoint)) .flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel .flatMap(channel => fs2.Stream diff --git a/modules/examples/smithyClient/src/main/scala/examples/smithy/client/ClientMain.scala b/modules/examples/smithyClient/src/main/scala/examples/smithy/client/ClientMain.scala index ee3b8c2..06871b2 100644 --- a/modules/examples/smithyClient/src/main/scala/examples/smithy/client/ClientMain.scala +++ b/modules/examples/smithyClient/src/main/scala/examples/smithy/client/ClientMain.scala @@ -32,11 +32,11 @@ object SmithyClientMain extends IOApp.Simple { // Starting the server rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar) // Creating a channel that will be used to communicate to the server - fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some) + fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some) // Mounting our implementation of the generated interface onto the channel _ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client)) // Creating stubs to talk to the remote server - server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel) + server: TestServer[IO] = ClientStub(test.TestServer, fs2Channel) _ <- Stream(()) .concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin)) .concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce)) diff --git a/modules/examples/smithyServer/src/main/scala/examples/smithy/server/ServerMain.scala b/modules/examples/smithyServer/src/main/scala/examples/smithy/server/ServerMain.scala index 79075bb..ed81d28 100644 --- a/modules/examples/smithyServer/src/main/scala/examples/smithy/server/ServerMain.scala +++ b/modules/examples/smithyServer/src/main/scala/examples/smithy/server/ServerMain.scala @@ -24,13 +24,11 @@ object ServerMain extends IOApp.Simple { def run: IO[Unit] = { val run = - FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) + FS2Channel + .stream[IO](cancelTemplate = Some(cancelEndpoint)) .flatMap { channel => - ClientStub - .stream(TestClient, channel) - .flatMap { testClient => - channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient))) - } + val testClient = ClientStub(TestClient, channel) + channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient))) } .flatMap { channel => fs2.Stream diff --git a/modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala b/modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala index 2be38ab..c2cfa78 100644 --- a/modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala +++ b/modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala @@ -70,9 +70,15 @@ object FS2Channel { } yield impl } + @deprecated("use stream or resource", "0.0.9") def apply[F[_]: Concurrent]( bufferSize: Int = 2048, cancelTemplate: Option[CancelTemplate] = None + ): Stream[F, FS2Channel[F]] = stream(bufferSize, cancelTemplate) + + def stream[F[_]: Concurrent]( + bufferSize: Int = 2048, + cancelTemplate: Option[CancelTemplate] = None ): Stream[F, FS2Channel[F]] = Stream.resource(resource(bufferSize, cancelTemplate)) private case class State[F[_]]( diff --git a/modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala b/modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala index c77c114..f36ab31 100644 --- a/modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala +++ b/modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala @@ -24,6 +24,10 @@ package object fs2 { def doAttempt[A](fa: F[A]): F[Either[Throwable, A]] = MonadThrow[F].attempt(fa) def doRaiseError[A](e: Throwable): F[A] = MonadThrow[F].raiseError(e) + + override def doMap[A, B](fa: F[A])(f: A => B): F[B] = Monad[F].map(fa)(f) + + override def doVoid[A](fa: F[A]): F[Unit] = Monad[F].void(fa) } } diff --git a/modules/fs2/src/test/scala/jsonrpclib/fs2/FS2ChannelSpec.scala b/modules/fs2/src/test/scala/jsonrpclib/fs2/FS2ChannelSpec.scala index 43b7c60..e6c94dc 100644 --- a/modules/fs2/src/test/scala/jsonrpclib/fs2/FS2ChannelSpec.scala +++ b/modules/fs2/src/test/scala/jsonrpclib/fs2/FS2ChannelSpec.scala @@ -30,8 +30,8 @@ object FS2ChannelSpec extends SimpleIOSuite { def setup(cancelTemplate: CancelTemplate, endpoints: Endpoint[IO]*) = setupAux(endpoints, Some(cancelTemplate)) def setupAux(endpoints: Seq[Endpoint[IO]], cancelTemplate: Option[CancelTemplate]): Stream[IO, ClientSideChannel] = { for { - serverSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate) - clientSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate) + serverSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate) + clientSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate) _ <- serverSideChannel.withEndpointsStream(endpoints) _ <- Stream(()) .concurrently(clientSideChannel.output.through(serverSideChannel.input)) diff --git a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ClientStub.scala b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ClientStub.scala index f6e56ea..ee8c71e 100644 --- a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ClientStub.scala +++ b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ClientStub.scala @@ -1,67 +1,34 @@ package jsonrpclib.smithy4sinterop import smithy4s.~> -import cats.MonadThrow -import jsonrpclib.fs2._ import smithy4s.Service import smithy4s.schema._ -import cats.effect.kernel.Async -import smithy4s.kinds.PolyFunction5 import smithy4s.ShapeId -import cats.syntax.all._ import smithy4s.json.Json import jsonrpclib.Codec._ import com.github.plokhotnyuk.jsoniter_scala.core._ +import jsonrpclib.Channel +import jsonrpclib.Monadic object ClientStub { - def apply[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit - F: Async[F] - ): F[service.Impl[F]] = new ClientStub(service, channel).compile - - def stream[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit - F: Async[F] - ): fs2.Stream[F, service.Impl[F]] = fs2.Stream.eval(new ClientStub(service, channel).compile) + def apply[Alg[_[_, _, _, _, _]], F[_]: Monadic](service: Service[Alg], channel: Channel[F]): service.Impl[F] = + new ClientStub(service, channel).compile } -private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], channel: FS2Channel[F])(implicit - F: Async[F] -) { - - def compile: F[service.Impl[F]] = precompileAll.map { stubCache => - val interpreter = new service.FunctorInterpreter[F] { - def apply[I, E, O, SI, SO](op: service.Operation[I, E, O, SI, SO]): F[O] = { - val smithy4sEndpoint = service.endpoint(op) - val input = service.input(op) - (stubCache(smithy4sEndpoint): F[I => F[O]]).flatMap { stub => - stub(input) - } +private class ClientStub[Alg[_[_, _, _, _, _]], F[_]: Monadic](val service: Service[Alg], channel: Channel[F]) { + + def compile: service.Impl[F] = { + val interpreter = new service.FunctorEndpointCompiler[F] { + def apply[I, E, O, SI, SO](e: service.Endpoint[I, E, O, SI, SO]): I => F[O] = { + val shapeId = e.id + val spec = EndpointSpec.fromHints(e.hints).toRight(NotJsonRPCEndpoint(shapeId)).toTry.get + + jsonRPCStub(e, spec) } } - service.fromPolyFunction(interpreter) - } - private type Stub[I, E, O, SI, SO] = F[I => F[O]] - private val precompileAll: F[PolyFunction5[service.Endpoint, Stub]] = { - F.ref(Map.empty[ShapeId, Any]).flatMap { cache => - service.endpoints.toList - .traverse_ { ep => - val shapeId = ep.id - EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).flatMap { epSpec => - val stub = jsonRPCStub(ep, epSpec) - cache.update(_ + (shapeId -> stub)) - } - } - .as { - new PolyFunction5[service.Endpoint, Stub] { - def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = { - cache.get.map { c => - c(ep.id).asInstanceOf[I => F[O]] - } - } - } - } - } + service.impl(interpreter) } private val jsoniterCodecGlobalCache = Json.jsoniter.createCache() @@ -80,7 +47,7 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], endpointSpec match { case EndpointSpec.Notification(methodName) => val coerce = coerceUnit[O](smithy4sEndpoint.output) - channel.notificationStub[I](methodName).andThen(f => f *> coerce) + channel.notificationStub[I](methodName).andThen(f => Monadic[F].doFlatMap(f)(_ => coerce)) case EndpointSpec.Request(methodName) => channel.simpleStub[I, O](methodName) } @@ -92,8 +59,8 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], private object CoerceUnitVisitor extends (Schema ~> F) { def apply[A](schema: Schema[A]): F[A] = schema match { case s @ Schema.StructSchema(_, _, _, make) if s.isUnit => - MonadThrow[F].unit.asInstanceOf[F[A]] - case _ => MonadThrow[F].raiseError[A](NotUnitReturnType) + Monadic[F].doPure(()).asInstanceOf[F[A]] + case _ => Monadic[F].doRaiseError[A](NotUnitReturnType) } } diff --git a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/EndpointSpec.scala b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/EndpointSpec.scala index 2e29930..4dd4386 100644 --- a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/EndpointSpec.scala +++ b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/EndpointSpec.scala @@ -2,8 +2,8 @@ package jsonrpclib.smithy4sinterop import smithy4s.Hints -sealed trait EndpointSpec -object EndpointSpec { +private[smithy4sinterop] sealed trait EndpointSpec +private[smithy4sinterop] object EndpointSpec { case class Notification(methodName: String) extends EndpointSpec case class Request(methodName: String) extends EndpointSpec diff --git a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ServerEndpoints.scala b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ServerEndpoints.scala index 39b476a..a6593f7 100644 --- a/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ServerEndpoints.scala +++ b/modules/smithy4s/src/main/scala/jsonrpclib/smithy4sinterop/ServerEndpoints.scala @@ -1,10 +1,7 @@ package jsonrpclib.smithy4sinterop import _root_.smithy4s.{Endpoint => Smithy4sEndpoint} -import cats.MonadThrow -import cats.syntax.all._ import jsonrpclib.Endpoint -import jsonrpclib.fs2._ import smithy4s.Service import smithy4s.kinds.FunctorAlgebra import smithy4s.kinds.FunctorInterpreter @@ -12,12 +9,13 @@ import smithy4s.json.Json import smithy4s.schema.Schema import jsonrpclib.Codec._ import com.github.plokhotnyuk.jsoniter_scala.core._ +import jsonrpclib.Monadic object ServerEndpoints { def apply[Alg[_[_, _, _, _, _]], F[_]]( impl: FunctorAlgebra[Alg, F] - )(implicit service: Service[Alg], F: MonadThrow[F]): List[Endpoint[F]] = { + )(implicit service: Service[Alg], F: Monadic[F]): List[Endpoint[F]] = { val interpreter: service.FunctorInterpreter[F] = service.toPolyFunction(impl) service.endpoints.toList.flatMap { smithy4sEndpoint => EndpointSpec @@ -35,7 +33,7 @@ object ServerEndpoints { Json.jsoniter.fromSchema(schema, jsoniterCodecGlobalCache) // TODO : codify errors at smithy level and handle them. - def jsonRPCEndpoint[F[_]: MonadThrow, Op[_, _, _, _, _], I, E, O, SI, SO]( + def jsonRPCEndpoint[F[_]: Monadic, Op[_, _, _, _, _], I, E, O, SI, SO]( smithy4sEndpoint: Smithy4sEndpoint[Op, I, E, O, SI, SO], endpointSpec: EndpointSpec, impl: FunctorInterpreter[Op, F] @@ -48,7 +46,7 @@ object ServerEndpoints { case EndpointSpec.Notification(methodName) => Endpoint[F](methodName).notification { (input: I) => val op = smithy4sEndpoint.wrap(input) - impl(op).void + Monadic[F].doVoid(impl(op)) } case EndpointSpec.Request(methodName) => Endpoint[F](methodName).simple { (input: I) => diff --git a/project/plugins.sbt b/project/plugins.sbt index 00f2eb5..399d7d0 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -14,6 +14,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.4") -addSbtPlugin("com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.18.34") +addSbtPlugin("com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.18.35") addDependencyTreePlugin