Skip to content

Commit 3ca85da

Browse files
committed
framework: fixed accept loop of the simple server
1 parent 22cb6fb commit 3ca85da

File tree

1 file changed

+53
-28
lines changed
  • ouroboros-network/framework/lib/Ouroboros/Network/Server

1 file changed

+53
-28
lines changed

ouroboros-network/framework/lib/Ouroboros/Network/Server/Simple.hs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{-# LANGUAGE FlexibleContexts #-}
22
{-# LANGUAGE GADTs #-}
33
{-# LANGUAGE LambdaCase #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
45
{-# LANGUAGE RankNTypes #-}
56
{-# LANGUAGE ScopedTypeVariables #-}
67

@@ -12,7 +13,6 @@ module Ouroboros.Network.Server.Simple where
1213

1314
import Control.Applicative (Alternative)
1415
import Control.Concurrent.JobPool qualified as JobPool
15-
import Control.Monad (forever)
1616
import Control.Monad.Class.MonadAsync
1717
import Control.Monad.Class.MonadFork (MonadFork)
1818
import Control.Monad.Class.MonadSTM
@@ -28,10 +28,18 @@ import Network.Mux qualified as Mx
2828
import Ouroboros.Network.ConnectionId
2929
import Ouroboros.Network.Mux
3030
import Ouroboros.Network.Protocol.Handshake
31-
import Ouroboros.Network.Snocket as Snocket
31+
import Ouroboros.Network.Server (isECONNABORTED)
32+
import Ouroboros.Network.Snocket (Snocket)
33+
import Ouroboros.Network.Snocket qualified as Snocket
3234
import Ouroboros.Network.Socket
3335

3436

37+
-- TODO: add tracers:
38+
--
39+
-- * accept errors,
40+
-- * errors thrown by a connection handler thread,
41+
-- * mux tracers
42+
--
3543
with :: forall fd addr vNumber vData m a b.
3644
( Alternative (STM m),
3745
MonadAsync m,
@@ -60,37 +68,54 @@ with sn makeBearer configureSock addr handshakeArgs versions k =
6068
configureSock sd addr
6169
Snocket.bind sn sd addr
6270
Snocket.listen sn sd
63-
addr' <- getLocalAddr sn sd
71+
addr' <- Snocket.getLocalAddr sn sd
6472
pure (sd, addr'))
6573
(Snocket.close sn . fst)
6674
(\(sock, addr') ->
6775
-- accept loop
68-
withAsync (forever $ acceptOne jobPool sock) (k addr')
76+
withAsync (Snocket.accept sn sock >>= acceptLoop jobPool) (k addr')
6977
)
7078
where
71-
acceptOne :: JobPool.JobPool () m () -> fd -> m ()
72-
acceptOne jobPool sock = accept sn sock >>= runAccept >>= \case
73-
(Accepted sock' remoteAddr, _) -> do
74-
let connThread = do
75-
-- connection responder thread
76-
let connId = ConnectionId addr remoteAddr
77-
bearer <- Mx.getBearer makeBearer (-1) sock' Nothing
78-
configureSock sock' addr
79-
r <- runHandshakeServer bearer connId handshakeArgs versions
80-
case r of
81-
Left (HandshakeProtocolLimit e) -> throwIO e
82-
Left (HandshakeProtocolError e) -> throwIO e
83-
Right HandshakeQueryResult {} -> error "handshake query is not supported"
84-
Right (HandshakeNegotiationResult (SomeResponderApplication app) vNumber vData) -> do
85-
mux <- Mx.new Mx.nullTracers (toMiniProtocolInfos (runForkPolicy noBindForkPolicy (remoteAddress connId)) app)
86-
withAsync (Mx.run mux bearer) $ \aid -> do
87-
void $ simpleMuxCallback connId vNumber vData app mux aid
79+
acceptLoop :: JobPool.JobPool () m ()
80+
-> Snocket.Accept m fd addr
81+
-> m Void
82+
acceptLoop jobPool Snocket.Accept { Snocket.runAccept } = do
83+
(accepted, acceptNext) <- runAccept
84+
acceptOne accepted
85+
acceptLoop jobPool acceptNext
86+
where
87+
-- handle accept failures and fork a connection thread which performs
88+
-- a handshake and runs mux
89+
acceptOne :: Snocket.Accepted fd addr -> m ()
90+
acceptOne (Snocket.AcceptFailure e)
91+
| Just ioErr <- fromException e
92+
, isECONNABORTED ioErr
93+
= return ()
94+
acceptOne (Snocket.AcceptFailure e)
95+
= throwIO e
8896

89-
errorHandler = \e -> throwIO e
97+
acceptOne (Snocket.Accepted sock' remoteAddr) = do
98+
let connThread = do
99+
-- connection responder thread
100+
let connId = ConnectionId addr remoteAddr
101+
bearer <- Mx.getBearer makeBearer (-1) sock' Nothing
102+
configureSock sock' addr
103+
r <- runHandshakeServer bearer connId handshakeArgs versions
104+
case r of
105+
Left (HandshakeProtocolLimit e) -> throwIO e
106+
Left (HandshakeProtocolError e) -> throwIO e
107+
Right HandshakeQueryResult {} -> error "handshake query is not supported"
108+
Right (HandshakeNegotiationResult (SomeResponderApplication app) vNumber vData) -> do
109+
mux <- Mx.new Mx.nullTracers
110+
(toMiniProtocolInfos
111+
(runForkPolicy noBindForkPolicy (remoteAddress connId))
112+
app)
113+
withAsync (Mx.run mux bearer) $ \aid -> do
114+
void $ simpleMuxCallback connId vNumber vData app mux aid
90115

91-
JobPool.forkJob jobPool
92-
$ JobPool.Job connThread
93-
errorHandler
94-
()
95-
"conn-thread"
96-
(AcceptFailure e, _) -> throwIO e
116+
errorHandler = \e -> throwIO e
117+
JobPool.forkJob jobPool
118+
$ JobPool.Job connThread
119+
errorHandler
120+
()
121+
"conn-thread"

0 commit comments

Comments
 (0)