1- using SocketIOClient . Processors ;
1+ using SocketIOClient . Exceptions ;
2+ using SocketIOClient . WebSocketClient ;
23using System ;
34using System . Net . WebSockets ;
4- using System . Threading . Tasks ;
5- using System . Threading ;
65using System . Text ;
7- using SocketIOClient . Exceptions ;
6+ using System . Threading ;
7+ using System . Threading . Tasks ;
88
9- namespace SocketIOClient . WebSocketClient
9+ namespace SocketIOClient . Windows7
1010{
11- /// <summary>
12- /// Internally uses 'System.Net.WebSockets.ClientWebSocket' as websocket client
13- /// </summary>
14- public sealed class ClientWebSocket : IWebSocketClient
11+ public sealed class ClientWebSocketManaged : IWebSocketClient
1512 {
16- public ClientWebSocket ( SocketIO io )
13+ public ClientWebSocketManaged ( )
1714 {
18- _io = io ;
15+ ReceiveChunkSize = 1024 * 16 ;
16+ ConnectionTimeout = TimeSpan . FromSeconds ( 10 ) ;
1917 }
2018
21- const int ReceiveChunkSize = 1024 * 16 ;
19+ public int ReceiveChunkSize { get ; set ; }
20+ public TimeSpan ConnectionTimeout { get ; set ; }
2221
23- readonly SocketIO _io ;
2422 System . Net . WebSockets . Managed . ClientWebSocket _ws ;
25- CancellationTokenSource _wsWorkTokenSource ;
2623 readonly SemaphoreSlim sendLock = new SemaphoreSlim ( 1 , 1 ) ;
27- readonly EngineIOProtocolProcessor processor = new EngineIOProtocolProcessor ( ) ;
24+ CancellationTokenSource _listenToken ;
2825
2926 public Action < System . Net . WebSockets . Managed . ClientWebSocketOptions > Config { get ; set ; }
3027
@@ -38,17 +35,17 @@ public ClientWebSocket(SocketIO io)
3835 /// <returns></returns>
3936 public async Task ConnectAsync ( Uri uri )
4037 {
41- if ( _ws != null )
42- _ws . Dispose ( ) ;
38+ DisposeWebSocketIfNotNull ( ) ;
4339 _ws = new System . Net . WebSockets . Managed . ClientWebSocket ( ) ;
44- Config ? . Invoke ( _ws . Options ) ;
4540
46- _wsWorkTokenSource = new CancellationTokenSource ( ) ;
47- var wsConnectionTokenSource = new CancellationTokenSource ( _io . Options . ConnectionTimeout ) ;
41+ Config ? . Invoke ( _ws . Options ) ;
42+ var wsConnectionTokenSource = new CancellationTokenSource ( ConnectionTimeout ) ;
4843 try
4944 {
5045 await _ws . ConnectAsync ( uri , wsConnectionTokenSource . Token ) ;
51- _ = Task . Run ( ListenAsync ) ;
46+ DisposeListenTokenIfNotNull ( ) ;
47+ _listenToken = new CancellationTokenSource ( ) ;
48+ _ = ListenAsync ( _listenToken . Token ) ;
5249 }
5350 catch ( TaskCanceledException )
5451 {
@@ -64,7 +61,7 @@ public async Task ConnectAsync(Uri uri)
6461 /// <exception cref="InvalidSocketStateException"></exception>
6562 public async Task SendMessageAsync ( string text )
6663 {
67- await SendMessageAsync ( text , _wsWorkTokenSource . Token ) ;
64+ await SendMessageAsync ( text , CancellationToken . None ) ;
6865 }
6966
7067 public async Task SendMessageAsync ( string text , CancellationToken cancellationToken )
@@ -107,9 +104,15 @@ public async Task SendMessageAsync(string text, CancellationToken cancellationTo
107104 /// <exception cref="InvalidSocketStateException"></exception>
108105 public async Task SendMessageAsync ( byte [ ] bytes )
109106 {
110- await SendMessageAsync ( bytes , _wsWorkTokenSource . Token ) ;
107+ await SendMessageAsync ( bytes , CancellationToken . None ) ;
111108 }
112109
110+ /// <summary>
111+ ///
112+ /// </summary>
113+ /// <param name="bytes"></param>
114+ /// <returns></returns>
115+ /// <exception cref="InvalidSocketStateException"></exception>
113116 public async Task SendMessageAsync ( byte [ ] bytes , CancellationToken cancellationToken )
114117 {
115118 if ( _ws == null )
@@ -143,26 +146,29 @@ public async Task SendMessageAsync(byte[] bytes, CancellationToken cancellationT
143146 public async Task DisconnectAsync ( )
144147 {
145148 await _ws . CloseAsync ( WebSocketCloseStatus . NormalClosure , string . Empty , CancellationToken . None ) ;
146- Close ( null ) ;
149+ OnClosed ( "io client disconnect" ) ;
147150 }
148151
149- private async Task ListenAsync ( )
152+ private async Task ListenAsync ( CancellationToken cancellationToken )
150153 {
151154 while ( true )
152155 {
156+ if ( cancellationToken . IsCancellationRequested )
157+ break ;
153158 var buffer = new byte [ ReceiveChunkSize ] ;
154159 int count = 0 ;
155160 WebSocketReceiveResult result = null ;
156161 while ( _ws . State == WebSocketState . Open )
157162 {
158163 try
159164 {
160- //result = await _ws.ReceiveAsync(new ArraySegment<byte>(buffer), _wsWorkTokenSource.Token);
165+ if ( cancellationToken . IsCancellationRequested )
166+ break ;
161167 var subBuffer = new byte [ ReceiveChunkSize ] ;
162- result = await _ws . ReceiveAsync ( new ArraySegment < byte > ( subBuffer ) , CancellationToken . None ) ;
168+ result = await _ws . ReceiveAsync ( new ArraySegment < byte > ( subBuffer ) , cancellationToken ) ;
163169 if ( result . MessageType == WebSocketMessageType . Close )
164170 {
165- Close ( "io server disconnect" ) ;
171+ OnClosed ( "io server disconnect" ) ;
166172 break ;
167173 }
168174 else if ( result . MessageType == WebSocketMessageType . Text || result . MessageType == WebSocketMessageType . Binary )
@@ -181,7 +187,7 @@ private async Task ListenAsync()
181187 }
182188 catch ( WebSocketException e )
183189 {
184- Close ( e . Message ) ;
190+ OnClosed ( e . Message ) ;
185191 break ;
186192 }
187193 }
@@ -195,40 +201,52 @@ private async Task ListenAsync()
195201#if DEBUG
196202 System . Diagnostics . Trace . WriteLine ( $ "⬇ { DateTime . Now } { message } ") ;
197203#endif
198- processor . Process ( new MessageContext
204+ if ( OnTextReceived is null )
199205 {
200- Message = message ,
201- SocketIO = _io
202- } ) ;
206+ throw new ArgumentNullException ( nameof ( OnTextReceived ) ) ;
207+ }
208+ OnTextReceived ( message ) ;
203209 }
204210 else if ( result . MessageType == WebSocketMessageType . Binary )
205211 {
206212#if DEBUG
207213 System . Diagnostics . Trace . WriteLine ( $ "⬇ { DateTime . Now } Binary message") ;
208214#endif
209- byte [ ] bytes ;
210- if ( _io . Options . EIO == 3 )
215+ if ( OnTextReceived is null )
211216 {
212- count -= 1 ;
213- bytes = new byte [ count ] ;
214- Buffer . BlockCopy ( buffer , 1 , bytes , 0 , count ) ;
217+ throw new ArgumentNullException ( nameof ( OnTextReceived ) ) ;
215218 }
216- else
217- {
218- bytes = new byte [ count ] ;
219- Buffer . BlockCopy ( buffer , 0 , bytes , 0 , count ) ;
220- }
221- _io . InvokeBytesReceived ( bytes ) ;
219+ byte [ ] bytes = new byte [ count ] ;
220+ Buffer . BlockCopy ( buffer , 0 , bytes , 0 , count ) ;
221+ OnBinaryReceived ( bytes ) ;
222222 }
223223 }
224224 }
225225
226- private void Close ( string reason )
226+ public Action < string > OnTextReceived { get ; set ; }
227+ public Action < byte [ ] > OnBinaryReceived { get ; set ; }
228+ public Action < string > OnClosed { get ; set ; }
229+
230+ private void DisposeWebSocketIfNotNull ( )
231+ {
232+ if ( _ws != null )
233+ _ws . Dispose ( ) ;
234+ }
235+
236+ private void DisposeListenTokenIfNotNull ( )
227237 {
228- if ( reason != null )
238+ if ( _listenToken != null )
229239 {
230- _io . InvokeDisconnect ( reason ) ;
240+ _listenToken . Cancel ( ) ;
241+ _listenToken . Dispose ( ) ;
231242 }
232243 }
244+
245+ public void Dispose ( )
246+ {
247+ DisposeWebSocketIfNotNull ( ) ;
248+ DisposeListenTokenIfNotNull ( ) ;
249+ sendLock . Dispose ( ) ;
250+ }
233251 }
234252}
0 commit comments