Skip to content

Commit bb473b4

Browse files
committed
refactor EngineIO3Adapter
1 parent e3f63ed commit bb473b4

File tree

5 files changed

+109
-120
lines changed

5 files changed

+109
-120
lines changed

src/SocketIOClient/V2/Session/EngineIOHttpAdapter/EngineIO3Adapter.cs

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4-
using System.Threading.Tasks;
5-
using SocketIOClient.V2.Observers;
4+
using SocketIOClient.V2.Protocol;
65
using SocketIOClient.V2.Protocol.Http;
76

87
namespace SocketIOClient.V2.Session.EngineIOHttpAdapter;
98

109
public class EngineIO3Adapter : IEngineIOAdapter
1110
{
12-
private readonly IList<IMyObserver<string>> _textObservers = new List<IMyObserver<string>>();
13-
private readonly IList<IMyObserver<byte[]>> _byteObservers = new List<IMyObserver<byte[]>>();
14-
1511
public IHttpRequest ToHttpRequest(ICollection<byte[]> bytes)
1612
{
1713
if (!bytes.Any())
@@ -59,61 +55,31 @@ public IHttpRequest ToHttpRequest(string content)
5955
};
6056
}
6157

62-
private void NotifyTextObservers(string text)
63-
{
64-
foreach (var observer in _textObservers)
65-
{
66-
observer.OnNext(text);
67-
}
68-
}
69-
70-
public async Task OnNextAsync(IHttpResponse response)
58+
public IEnumerable<ProtocolMessage> GetMessages(string text)
7159
{
72-
var text = await response.ReadAsStringAsync();
7360
var p = 0;
7461
while (true)
7562
{
7663
var index = text.IndexOf(':', p);
7764
if (index == -1)
7865
{
79-
// TODO: can't handle this message
8066
break;
8167
}
8268
var lengthStr = text.Substring(p, index - p);
8369
if (int.TryParse(lengthStr, out var length))
8470
{
8571
var msg = text.Substring(index + 1, length);
86-
NotifyTextObservers(msg);
72+
yield return new ProtocolMessage { Text = msg };
8773
}
8874
else
8975
{
90-
// TODO: can't handle this message
9176
break;
9277
}
9378
p = index + length + 1;
9479
if (p >= text.Length)
9580
{
96-
// TODO: can't handle this message
9781
break;
9882
}
9983
}
10084
}
101-
102-
public void Subscribe(IMyObserver<string> observer)
103-
{
104-
if (_textObservers.Contains(observer))
105-
{
106-
return;
107-
}
108-
_textObservers.Add(observer);
109-
}
110-
111-
public void Subscribe(IMyObserver<byte[]> observer)
112-
{
113-
if (_byteObservers.Contains(observer))
114-
{
115-
return;
116-
}
117-
_byteObservers.Add(observer);
118-
}
11985
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using System.Collections.Generic;
2-
using SocketIOClient.V2.Observers;
2+
using SocketIOClient.V2.Protocol;
33
using SocketIOClient.V2.Protocol.Http;
44

55
namespace SocketIOClient.V2.Session.EngineIOHttpAdapter;
66

7-
public interface IEngineIOAdapter : IMyAsyncObserver<IHttpResponse>, IMyObservable<string>, IMyObservable<byte[]>
7+
public interface IEngineIOAdapter
88
{
99
IHttpRequest ToHttpRequest(ICollection<byte[]> bytes);
1010
IHttpRequest ToHttpRequest(string content);
11+
IEnumerable<ProtocolMessage> GetMessages(string text);
1112
}

src/SocketIOClient/V2/Session/HttpSession.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,28 @@ public HttpSession(
3838

3939
public void OnNext(ProtocolMessage protocolMessage)
4040
{
41-
if (protocolMessage.Type == ProtocolMessageType.Text)
41+
if (protocolMessage.Type == ProtocolMessageType.Bytes)
4242
{
43-
OnNextTextMessage(protocolMessage.Text);
43+
OnNextBytesMessage(protocolMessage.Bytes);
4444
return;
4545
}
46-
OnNextBytesMessage(protocolMessage.Bytes);
46+
var messages = _engineIOAdapter.GetMessages(protocolMessage.Text);
47+
HandleMessages(messages);
48+
}
49+
50+
private void HandleMessages(IEnumerable<ProtocolMessage> messages)
51+
{
52+
foreach (var message in messages)
53+
{
54+
if (message.Type == ProtocolMessageType.Bytes)
55+
{
56+
OnNextBytesMessage(message.Bytes);
57+
}
58+
else
59+
{
60+
OnNextTextMessage(message.Text);
61+
}
62+
}
4763
}
4864

4965
private void OnNextTextMessage(string text)

tests/SocketIOClient.UnitTests/V2/Session/EngineIOHttpAdapter/EngineIO3AdapterTests.cs

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using System.Threading.Tasks;
55
using FluentAssertions;
66
using JetBrains.Annotations;
7-
using NSubstitute;
8-
using SocketIOClient.V2.Observers;
97
using SocketIOClient.V2.Protocol.Http;
108
using SocketIOClient.V2.Session.EngineIOHttpAdapter;
119
using Xunit;
@@ -104,57 +102,41 @@ public void ToHttpRequest_WhenCalled_AlwaysPass(ICollection<byte[]> bytes, IHttp
104102
req.Should().BeEquivalentTo(result);
105103
}
106104

107-
private static readonly (string raw, IEnumerable<string> texts, IEnumerable<byte[]> bytes) Text1NoBytes = new(
108-
"1:2",
109-
["2"],
110-
new List<byte[]>());
105+
private static readonly (string raw, IEnumerable<string> textMessages) GetMessagesSinglePing = new("1:2", ["2"]);
111106

112-
private static readonly (string raw, IEnumerable<string> texts, IEnumerable<byte[]> bytes) Text12NoBytes = new(
107+
private static readonly (string raw, IEnumerable<string> textMessages) GetMessagesSingleHelloWorld = new(
113108
"12:hello world!",
114-
["hello world!"],
115-
new List<byte[]>());
109+
["hello world!"]);
116110

117-
private static readonly (string raw, IEnumerable<string> texts, IEnumerable<byte[]> bytes) Text1And12NoBytes = new(
111+
private static readonly (string raw, IEnumerable<string> textMessages) GetMessagesPingAndHelloWorld = new(
118112
"1:212:hello world!",
119-
["2", "hello world!"],
120-
new List<byte[]>());
113+
["2", "hello world!"]);
121114

122-
private static IEnumerable<(string raw, IEnumerable<string> texts, IEnumerable<byte[]> bytes)> OnNextAsyncStrongTypeCases
123-
{
124-
get
115+
public static TheoryData<string, IEnumerable<string>> GetMessagesCases =>
116+
new()
125117
{
126-
yield return Text1NoBytes;
127-
yield return Text12NoBytes;
128-
yield return Text1And12NoBytes;
129-
}
130-
}
131-
132-
public static IEnumerable<object[]> OnNextAsyncCases =>
133-
OnNextAsyncStrongTypeCases.Select(x => new object[] { x.raw, x.texts, x.bytes });
118+
{
119+
GetMessagesSinglePing.raw,
120+
GetMessagesSinglePing.textMessages
121+
},
122+
{
123+
GetMessagesSingleHelloWorld.raw,
124+
GetMessagesSingleHelloWorld.textMessages
125+
},
126+
{
127+
GetMessagesPingAndHelloWorld.raw,
128+
GetMessagesPingAndHelloWorld.textMessages
129+
},
130+
};
134131

135132
[Theory]
136-
[MemberData(nameof(OnNextAsyncCases))]
137-
public async Task OnNextAsync_WhenCalled_AlwaysPass(string raw, IEnumerable<string> texts, IEnumerable<byte[]> bytes)
133+
[MemberData(nameof(GetMessagesCases))]
134+
public void GetMessages_WhenCalled_AlwaysPass(string raw, IEnumerable<string> textMessages)
138135
{
139-
var capturedTexts = new List<string>();
140-
var capturedBytes = new List<byte[]>();
141-
var textObserver = Substitute.For<IMyObserver<string>>();
142-
textObserver
143-
.When(x => x.OnNext(Arg.Any<string>()))
144-
.Do(x => capturedTexts.Add(x.Arg<string>()));
145-
_adapter.Subscribe(textObserver);
146-
var byteObserver = Substitute.For<IMyObserver<byte[]>>();
147-
byteObserver
148-
.When(x => x.OnNext(Arg.Any<byte[]>()))
149-
.Do(x => capturedBytes.Add(x.Arg<byte[]>()));
150-
_adapter.Subscribe(byteObserver);
151-
152-
var response = Substitute.For<IHttpResponse>();
153-
response.ReadAsStringAsync().Returns(raw);
154-
await _adapter.OnNextAsync(response);
155-
156-
capturedTexts.Should().Equal(texts);
157-
capturedBytes.Should().Equal(bytes);
136+
_adapter.GetMessages(raw)
137+
.Select(m => m.Text)
138+
.Should()
139+
.BeEquivalentTo(textMessages);
158140
}
159141

160142
[Theory]

tests/SocketIOClient.UnitTests/V2/Session/HttpSessionTests.cs

Lines changed: 58 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,39 @@ await _session.Invoking(async x =>
8686
}
8787

8888
[Fact]
89-
public async Task ConnectAsync_GivenHttpResponse_MessageWillBePushed()
89+
public async Task Integration_HttpAdapterPushedMessages_MessagesWillBeForwardedToSubscribersOfHttpSession()
9090
{
91-
var captured = new List<IMessage>();
92-
91+
var httpClient = Substitute.For<IHttpClient>();
92+
var httpAdapter = new HttpAdapter(httpClient);
93+
var serializer = new SystemJsonSerializer(new Decapsulator());
94+
var uriConverter = new DefaultUriConverter(4);
95+
var session = new HttpSession(_engineIOAdapter, httpAdapter, serializer, uriConverter)
96+
{
97+
SessionOptions = new SessionOptions
98+
{
99+
ServerUri = new Uri("http://localhost:3000"),
100+
Query = new List<KeyValuePair<string, string>>(),
101+
},
102+
};
93103
var response = Substitute.For<IHttpResponse>();
94-
response.ReadAsStringAsync().Returns("0{\"sid\":\"123\",\"upgrades\":[\"websocket\"],\"pingInterval\":10000,\"pingTimeout\":5000}");
95-
var tuple = GetSubstitutes();
96-
tuple.observer
104+
response.ReadAsStringAsync().Returns("any text");
105+
httpClient.SendAsync(Arg.Any<IHttpRequest>(), Arg.Any<CancellationToken>()).Returns(response);
106+
_engineIOAdapter.GetMessages(Arg.Any<string>())
107+
.Returns([
108+
new ProtocolMessage
109+
{
110+
Type = ProtocolMessageType.Text,
111+
Text = "0{\"sid\":\"123\",\"upgrades\":[\"websocket\"],\"pingInterval\":10000,\"pingTimeout\":5000}",
112+
},
113+
]);
114+
var observer = Substitute.For<IMyObserver<IMessage>>();
115+
session.Subscribe(observer);
116+
var captured = new List<IMessage>();
117+
observer
97118
.When(x => x.OnNext(Arg.Any<IMessage>()))
98119
.Do(call => captured.Add(call.Arg<IMessage>()));
99-
tuple.httpClient.SendAsync(Arg.Any<IHttpRequest>(), Arg.Any<CancellationToken>()).Returns(response);
100120

101-
await tuple.session.ConnectAsync(CancellationToken.None);
121+
await httpAdapter.SendAsync(new ProtocolMessage(), CancellationToken.None);
102122

103123
captured.Should()
104124
.BeEquivalentTo(new List<IMessage>
@@ -124,11 +144,13 @@ public void OnNext_BinaryMessageIsNotReady_NoMessageWillBePushed()
124144
{
125145
BytesCount = 1,
126146
});
127-
128-
_session.OnNext(new ProtocolMessage
147+
var protocolMessage = new ProtocolMessage
129148
{
130149
Type = ProtocolMessageType.Text,
131-
});
150+
};
151+
_engineIOAdapter.GetMessages(Arg.Any<string>()).Returns([protocolMessage]);
152+
153+
_session.OnNext(protocolMessage);
132154

133155
observer.Received(0).OnNext(Arg.Any<IMessage>());
134156
_session.PendingDeliveryCount.Should().Be(1);
@@ -147,38 +169,26 @@ public void OnNext_BinaryMessageReady_MessageWillBePushed()
147169
BytesCount = 1,
148170
});
149171

172+
_engineIOAdapter.GetMessages(Arg.Any<string>())
173+
.Returns([
174+
new ProtocolMessage
175+
{
176+
Type = ProtocolMessageType.Text,
177+
},
178+
new ProtocolMessage
179+
{
180+
Type = ProtocolMessageType.Bytes,
181+
},
182+
]);
150183
_session.OnNext(new ProtocolMessage
151184
{
152185
Type = ProtocolMessageType.Text,
153186
});
154-
_session.OnNext(new ProtocolMessage
155-
{
156-
Type = ProtocolMessageType.Bytes,
157-
});
158187

159188
observer.Received(1).OnNext(Arg.Any<IBinaryMessage>());
160189
_session.PendingDeliveryCount.Should().Be(0);
161190
}
162191

163-
private (HttpSession session, HttpAdapter adapter, IHttpClient httpClient, IMyObserver<IMessage> observer) GetSubstitutes()
164-
{
165-
var httpClient = Substitute.For<IHttpClient>();
166-
var httpAdapter = new HttpAdapter(httpClient);
167-
var serializer = new SystemJsonSerializer(new Decapsulator());
168-
var uriConverter = new DefaultUriConverter(4);
169-
var session = new HttpSession(_engineIOAdapter, httpAdapter, serializer, uriConverter)
170-
{
171-
SessionOptions = new SessionOptions
172-
{
173-
ServerUri = new Uri("http://localhost:3000"),
174-
Query = new List<KeyValuePair<string, string>>(),
175-
},
176-
};
177-
var observer = Substitute.For<IMyObserver<IMessage>>();
178-
session.Subscribe(observer);
179-
return (session, httpAdapter, httpClient, observer);
180-
}
181-
182192
[Fact]
183193
public void OnNext_BinaryAckMessageIsNotReady_NoMessageWillBePushed()
184194
{
@@ -192,6 +202,13 @@ public void OnNext_BinaryAckMessageIsNotReady_NoMessageWillBePushed()
192202
BytesCount = 1,
193203
});
194204

205+
_engineIOAdapter.GetMessages(Arg.Any<string>())
206+
.Returns([
207+
new ProtocolMessage
208+
{
209+
Type = ProtocolMessageType.Text,
210+
},
211+
]);
195212
_session.OnNext(new ProtocolMessage
196213
{
197214
Type = ProtocolMessageType.Text,
@@ -215,6 +232,13 @@ public void OnNext_BinaryAckMessageReady_MessageWillBePushed()
215232
BytesCount = 1,
216233
});
217234

235+
_engineIOAdapter.GetMessages(Arg.Any<string>())
236+
.Returns([
237+
new ProtocolMessage
238+
{
239+
Type = ProtocolMessageType.Text,
240+
},
241+
]);
218242
_session.OnNext(new ProtocolMessage
219243
{
220244
Type = ProtocolMessageType.Text,

0 commit comments

Comments
 (0)