From b96ec9e47fc42f6b7573598f8cdd52af3f6d1a49 Mon Sep 17 00:00:00 2001 From: ghostbuster91 Date: Thu, 8 May 2025 11:40:49 +0200 Subject: [PATCH] Revert "Remove half-baked FutureBaseChannel" This reverts commit be15b68d642ad75e79d2ae972550fc9a4b570dd2. --- .../internals/FutureBaseChannel.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 modules/core/src/main/scala/jsonrpclib/internals/FutureBaseChannel.scala diff --git a/modules/core/src/main/scala/jsonrpclib/internals/FutureBaseChannel.scala b/modules/core/src/main/scala/jsonrpclib/internals/FutureBaseChannel.scala new file mode 100644 index 0000000..cb73e08 --- /dev/null +++ b/modules/core/src/main/scala/jsonrpclib/internals/FutureBaseChannel.scala @@ -0,0 +1,41 @@ +package jsonrpclib + +import jsonrpclib.internals._ + +import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Try + +abstract class FutureBasedChannel(endpoints: List[Endpoint[Future]])(implicit ec: ExecutionContext) + extends MessageDispatcher[Future] { + + override def createPromise[A](callId: CallId): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful { + val promise = Promise[A]() + val fulfill: Try[A] => Future[Unit] = (a: Try[A]) => Future.successful(promise.complete(a)) + val future: () => Future[A] = () => promise.future + (fulfill, future) + } + + protected def storePendingCall(callId: CallId, handle: OutputMessage => Future[Unit]): Future[Unit] = + Future.successful { val _ = pending.put(callId, handle) } + protected def removePendingCall(callId: CallId): Future[Option[OutputMessage => Future[Unit]]] = + Future.successful { Option(pending.remove(callId)) } + protected def getEndpoint(method: String): Future[Option[Endpoint[Future]]] = + Future.successful(endpointsMap.get(method)) + protected def sendMessage(message: Message): Future[Unit] = { + sendPayload(Codec.encode(message)).map(_ => ()) + } + protected def nextCallId(): Future[CallId] = Future.successful(CallId.NumberId(nextID.incrementAndGet())) + + private[this] val endpointsMap: Map[String, Endpoint[Future]] = endpoints.map(ep => ep.method -> ep).toMap + private[this] val pending = new java.util.concurrent.ConcurrentHashMap[CallId, OutputMessage => Future[Unit]] + private[this] val nextID = new AtomicLong(0L) + // @volatile + // private[this] var closeReason: Throwable = _ + + def sendPayload(msg: Payload): Future[Unit] = ??? + def reportError(params: Option[Payload], error: ProtocolError, method: String): Future[Unit] = ??? + +}