Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ val smithy4s = projectMatrix
.nativePlatform(Seq(scala3))
.disablePlugins(AssemblyPlugin)
.enablePlugins(Smithy4sCodegenPlugin)
.dependsOn(fs2)
.dependsOn(core)
.settings(
name := "jsonrpclib-smithy4s",
commonSettings,
Expand Down Expand Up @@ -169,7 +169,7 @@ val exampleClient = projectMatrix
val exampleSmithyShared = projectMatrix
.in(file("modules") / "examples/smithyShared")
.jvmPlatform(List(scala213), commonJvmSettings)
.dependsOn(smithy4s)
.dependsOn(smithy4s, fs2)
.enablePlugins(Smithy4sCodegenPlugin)
.settings(
commonSettings,
Expand Down
6 changes: 6 additions & 0 deletions modules/core/src/main/scala/jsonrpclib/Monadic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ trait Monadic[F[_]] {
def doPure[A](a: A): F[A]
def doAttempt[A](fa: F[A]): F[Either[Throwable, A]]
def doRaiseError[A](e: Throwable): F[A]
def doMap[A, B](fa: F[A])(f: A => B): F[B] = doFlatMap(fa)(a => doPure(f(a)))
def doVoid[A](fa: F[A]): F[Unit] = doMap(fa)(_ => ())
}

object Monadic {
def apply[F[_]](implicit F: Monadic[F]): Monadic[F] = F

implicit def monadicFuture(implicit ec: ExecutionContext): Monadic[Future] = new Monadic[Future] {
def doFlatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)

Expand All @@ -19,5 +23,7 @@ object Monadic {
def doAttempt[A](fa: Future[A]): Future[Either[Throwable, A]] = fa.map(Right(_)).recover(Left(_))

def doRaiseError[A](e: Throwable): Future[A] = Future.failed(e)

override def doMap[A, B](fa: Future[A])(f: A => B): Future[B] = fa.map(f)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object ClientMain extends IOApp.Simple {
// Starting the server
rp <- fs2.Stream.resource(Processes[IO].spawn(process.ProcessBuilder("java", "-jar", serverJar)))
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some)
_ <- Stream(())
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ object ServerMain extends IOApp.Simple {
def run: IO[Unit] = {
// Using errorln as stdout is used by the RPC channel
IO.consoleForIO.errorln("Starting server") >>
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
FS2Channel
.stream[IO](cancelTemplate = Some(cancelEndpoint))
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
.flatMap(channel =>
fs2.Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ object SmithyClientMain extends IOApp.Simple {
// Starting the server
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
fs2Channel <- FS2Channel.stream[IO](cancelTemplate = cancelEndpoint.some)
// Mounting our implementation of the generated interface onto the channel
_ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client))
// Creating stubs to talk to the remote server
server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel)
server: TestServer[IO] = ClientStub(test.TestServer, fs2Channel)
_ <- Stream(())
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ object ServerMain extends IOApp.Simple {

def run: IO[Unit] = {
val run =
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
FS2Channel
.stream[IO](cancelTemplate = Some(cancelEndpoint))
.flatMap { channel =>
ClientStub
.stream(TestClient, channel)
.flatMap { testClient =>
channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
}
val testClient = ClientStub(TestClient, channel)
channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
}
.flatMap { channel =>
fs2.Stream
Expand Down
6 changes: 6 additions & 0 deletions modules/fs2/src/main/scala/jsonrpclib/fs2/FS2Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ object FS2Channel {
} yield impl
}

@deprecated("use stream or resource", "0.0.9")
def apply[F[_]: Concurrent](
bufferSize: Int = 2048,
cancelTemplate: Option[CancelTemplate] = None
): Stream[F, FS2Channel[F]] = stream(bufferSize, cancelTemplate)

def stream[F[_]: Concurrent](
bufferSize: Int = 2048,
cancelTemplate: Option[CancelTemplate] = None
): Stream[F, FS2Channel[F]] = Stream.resource(resource(bufferSize, cancelTemplate))

private case class State[F[_]](
Expand Down
4 changes: 4 additions & 0 deletions modules/fs2/src/main/scala/jsonrpclib/fs2/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ package object fs2 {
def doAttempt[A](fa: F[A]): F[Either[Throwable, A]] = MonadThrow[F].attempt(fa)

def doRaiseError[A](e: Throwable): F[A] = MonadThrow[F].raiseError(e)

override def doMap[A, B](fa: F[A])(f: A => B): F[B] = Monad[F].map(fa)(f)

override def doVoid[A](fa: F[A]): F[Unit] = Monad[F].void(fa)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ object FS2ChannelSpec extends SimpleIOSuite {
def setup(cancelTemplate: CancelTemplate, endpoints: Endpoint[IO]*) = setupAux(endpoints, Some(cancelTemplate))
def setupAux(endpoints: Seq[Endpoint[IO]], cancelTemplate: Option[CancelTemplate]): Stream[IO, ClientSideChannel] = {
for {
serverSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
clientSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
serverSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate)
clientSideChannel <- FS2Channel.stream[IO](cancelTemplate = cancelTemplate)
_ <- serverSideChannel.withEndpointsStream(endpoints)
_ <- Stream(())
.concurrently(clientSideChannel.output.through(serverSideChannel.input))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,34 @@
package jsonrpclib.smithy4sinterop

import smithy4s.~>
import cats.MonadThrow
import jsonrpclib.fs2._
import smithy4s.Service
import smithy4s.schema._
import cats.effect.kernel.Async
import smithy4s.kinds.PolyFunction5
import smithy4s.ShapeId
import cats.syntax.all._
import smithy4s.json.Json
import jsonrpclib.Codec._
import com.github.plokhotnyuk.jsoniter_scala.core._
import jsonrpclib.Channel
import jsonrpclib.Monadic

object ClientStub {

def apply[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit
F: Async[F]
): F[service.Impl[F]] = new ClientStub(service, channel).compile

def stream[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit
F: Async[F]
): fs2.Stream[F, service.Impl[F]] = fs2.Stream.eval(new ClientStub(service, channel).compile)
def apply[Alg[_[_, _, _, _, _]], F[_]: Monadic](service: Service[Alg], channel: Channel[F]): service.Impl[F] =
new ClientStub(service, channel).compile
}

private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], channel: FS2Channel[F])(implicit
F: Async[F]
) {

def compile: F[service.Impl[F]] = precompileAll.map { stubCache =>
val interpreter = new service.FunctorInterpreter[F] {
def apply[I, E, O, SI, SO](op: service.Operation[I, E, O, SI, SO]): F[O] = {
val smithy4sEndpoint = service.endpoint(op)
val input = service.input(op)
(stubCache(smithy4sEndpoint): F[I => F[O]]).flatMap { stub =>
stub(input)
}
private class ClientStub[Alg[_[_, _, _, _, _]], F[_]: Monadic](val service: Service[Alg], channel: Channel[F]) {

def compile: service.Impl[F] = {
val interpreter = new service.FunctorEndpointCompiler[F] {
def apply[I, E, O, SI, SO](e: service.Endpoint[I, E, O, SI, SO]): I => F[O] = {
val shapeId = e.id
val spec = EndpointSpec.fromHints(e.hints).toRight(NotJsonRPCEndpoint(shapeId)).toTry.get

jsonRPCStub(e, spec)
Comment on lines +21 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome! :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smithy4s 0.18 really was a game changer

}
}
service.fromPolyFunction(interpreter)
}

private type Stub[I, E, O, SI, SO] = F[I => F[O]]
private val precompileAll: F[PolyFunction5[service.Endpoint, Stub]] = {
F.ref(Map.empty[ShapeId, Any]).flatMap { cache =>
service.endpoints.toList
.traverse_ { ep =>
val shapeId = ep.id
EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).flatMap { epSpec =>
val stub = jsonRPCStub(ep, epSpec)
cache.update(_ + (shapeId -> stub))
}
}
.as {
new PolyFunction5[service.Endpoint, Stub] {
def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = {
cache.get.map { c =>
c(ep.id).asInstanceOf[I => F[O]]
}
}
}
}
}
service.impl(interpreter)
}

private val jsoniterCodecGlobalCache = Json.jsoniter.createCache()
Expand All @@ -80,7 +47,7 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg],
endpointSpec match {
case EndpointSpec.Notification(methodName) =>
val coerce = coerceUnit[O](smithy4sEndpoint.output)
channel.notificationStub[I](methodName).andThen(f => f *> coerce)
channel.notificationStub[I](methodName).andThen(f => Monadic[F].doFlatMap(f)(_ => coerce))
case EndpointSpec.Request(methodName) =>
channel.simpleStub[I, O](methodName)
}
Expand All @@ -92,8 +59,8 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg],
private object CoerceUnitVisitor extends (Schema ~> F) {
def apply[A](schema: Schema[A]): F[A] = schema match {
case s @ Schema.StructSchema(_, _, _, make) if s.isUnit =>
MonadThrow[F].unit.asInstanceOf[F[A]]
case _ => MonadThrow[F].raiseError[A](NotUnitReturnType)
Monadic[F].doPure(()).asInstanceOf[F[A]]
case _ => Monadic[F].doRaiseError[A](NotUnitReturnType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package jsonrpclib.smithy4sinterop

import smithy4s.Hints

sealed trait EndpointSpec
object EndpointSpec {
private[smithy4sinterop] sealed trait EndpointSpec
private[smithy4sinterop] object EndpointSpec {
case class Notification(methodName: String) extends EndpointSpec
case class Request(methodName: String) extends EndpointSpec

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package jsonrpclib.smithy4sinterop

import _root_.smithy4s.{Endpoint => Smithy4sEndpoint}
import cats.MonadThrow
import cats.syntax.all._
import jsonrpclib.Endpoint
import jsonrpclib.fs2._
import smithy4s.Service
import smithy4s.kinds.FunctorAlgebra
import smithy4s.kinds.FunctorInterpreter
import smithy4s.json.Json
import smithy4s.schema.Schema
import jsonrpclib.Codec._
import com.github.plokhotnyuk.jsoniter_scala.core._
import jsonrpclib.Monadic

object ServerEndpoints {

def apply[Alg[_[_, _, _, _, _]], F[_]](
impl: FunctorAlgebra[Alg, F]
)(implicit service: Service[Alg], F: MonadThrow[F]): List[Endpoint[F]] = {
)(implicit service: Service[Alg], F: Monadic[F]): List[Endpoint[F]] = {
val interpreter: service.FunctorInterpreter[F] = service.toPolyFunction(impl)
service.endpoints.toList.flatMap { smithy4sEndpoint =>
EndpointSpec
Expand All @@ -35,7 +33,7 @@ object ServerEndpoints {
Json.jsoniter.fromSchema(schema, jsoniterCodecGlobalCache)

// TODO : codify errors at smithy level and handle them.
def jsonRPCEndpoint[F[_]: MonadThrow, Op[_, _, _, _, _], I, E, O, SI, SO](
def jsonRPCEndpoint[F[_]: Monadic, Op[_, _, _, _, _], I, E, O, SI, SO](
smithy4sEndpoint: Smithy4sEndpoint[Op, I, E, O, SI, SO],
endpointSpec: EndpointSpec,
impl: FunctorInterpreter[Op, F]
Expand All @@ -48,7 +46,7 @@ object ServerEndpoints {
case EndpointSpec.Notification(methodName) =>
Endpoint[F](methodName).notification { (input: I) =>
val op = smithy4sEndpoint.wrap(input)
impl(op).void
Monadic[F].doVoid(impl(op))
}
case EndpointSpec.Request(methodName) =>
Endpoint[F](methodName).simple { (input: I) =>
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.4")

addSbtPlugin("com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.18.34")
addSbtPlugin("com.disneystreaming.smithy4s" % "smithy4s-sbt-codegen" % "0.18.35")

addDependencyTreePlugin