|
| 1 | +package jsonrpclib |
| 2 | + |
| 3 | +import jsonrpclib.internals._ |
| 4 | + |
| 5 | +import java.util.concurrent.atomic.AtomicLong |
| 6 | +import scala.concurrent.ExecutionContext |
| 7 | +import scala.concurrent.Future |
| 8 | +import scala.concurrent.Promise |
| 9 | +import scala.util.Try |
| 10 | + |
| 11 | +abstract class FutureBasedChannel(endpoints: List[Endpoint[Future]])(implicit ec: ExecutionContext) |
| 12 | + extends MessageDispatcher[Future] { |
| 13 | + |
| 14 | + override def createPromise[A](callId: CallId): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful { |
| 15 | + val promise = Promise[A]() |
| 16 | + val fulfill: Try[A] => Future[Unit] = (a: Try[A]) => Future.successful(promise.complete(a)) |
| 17 | + val future: () => Future[A] = () => promise.future |
| 18 | + (fulfill, future) |
| 19 | + } |
| 20 | + |
| 21 | + protected def storePendingCall(callId: CallId, handle: OutputMessage => Future[Unit]): Future[Unit] = |
| 22 | + Future.successful { val _ = pending.put(callId, handle) } |
| 23 | + protected def removePendingCall(callId: CallId): Future[Option[OutputMessage => Future[Unit]]] = |
| 24 | + Future.successful { Option(pending.remove(callId)) } |
| 25 | + protected def getEndpoint(method: String): Future[Option[Endpoint[Future]]] = |
| 26 | + Future.successful(endpointsMap.get(method)) |
| 27 | + protected def sendMessage(message: Message): Future[Unit] = { |
| 28 | + sendPayload(Codec.encode(message)).map(_ => ()) |
| 29 | + } |
| 30 | + protected def nextCallId(): Future[CallId] = Future.successful(CallId.NumberId(nextID.incrementAndGet())) |
| 31 | + |
| 32 | + private[this] val endpointsMap: Map[String, Endpoint[Future]] = endpoints.map(ep => ep.method -> ep).toMap |
| 33 | + private[this] val pending = new java.util.concurrent.ConcurrentHashMap[CallId, OutputMessage => Future[Unit]] |
| 34 | + private[this] val nextID = new AtomicLong(0L) |
| 35 | + // @volatile |
| 36 | + // private[this] var closeReason: Throwable = _ |
| 37 | + |
| 38 | + def sendPayload(msg: Payload): Future[Unit] = ??? |
| 39 | + def reportError(params: Option[Payload], error: ProtocolError, method: String): Future[Unit] = ??? |
| 40 | + |
| 41 | +} |
0 commit comments