Skip to content

Commit da59746

Browse files
committed
add func handler
1 parent aeb2d4d commit da59746

File tree

7 files changed

+70
-32
lines changed

7 files changed

+70
-32
lines changed
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System.Threading.Tasks;
2+
13
namespace SocketIOClient.V2.Observers;
24

35
public interface IMyObserver<in T>
46
{
5-
void OnNext(T protocolMessage);
7+
Task OnNextAsync(T protocolMessage);
68
}

src/SocketIOClient/V2/Protocol/Http/HttpAdapter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private async Task HandleResponseAsync(IHttpResponse response)
5959
var incomingMessage = await GetMessageAsync(response);
6060
foreach (var observer in _observers)
6161
{
62-
observer.OnNext(incomingMessage);
62+
await observer.OnNextAsync(incomingMessage);
6363
}
6464
}
6565

src/SocketIOClient/V2/Session/HttpSession.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,16 @@ public HttpSession(
3636

3737
public int PendingDeliveryCount => _messageQueue.Count;
3838

39-
public void OnNext(ProtocolMessage protocolMessage)
39+
public Task OnNextAsync(ProtocolMessage protocolMessage)
4040
{
4141
if (protocolMessage.Type == ProtocolMessageType.Bytes)
4242
{
4343
OnNextBytesMessage(protocolMessage.Bytes);
44-
return;
44+
return Task.CompletedTask;
4545
}
4646
var messages = _engineIOAdapter.GetMessages(protocolMessage.Text);
4747
HandleMessages(messages);
48+
return Task.CompletedTask;
4849
}
4950

5051
private void HandleMessages(IEnumerable<ProtocolMessage> messages)
@@ -92,7 +93,7 @@ private void NotifyObservers(IMessage message)
9293
{
9394
foreach (var observer in _observers)
9495
{
95-
observer.OnNext(message);
96+
observer.OnNextAsync(message);
9697
}
9798
}
9899

src/SocketIOClient/V2/SocketIO.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class SocketIO : ISocketIO
2424

2525

2626
private readonly Dictionary<int, Action<IAckMessage>> _ackHandlers = new();
27-
private readonly Dictionary<int, Func<SocketIOResponse, Task>> _funcHandlers = new();
27+
private readonly Dictionary<int, Func<IAckMessage, Task>> _funcHandlers = new();
2828
private readonly SocketIOOptions _options;
2929

3030

@@ -42,15 +42,6 @@ public SocketIO(Uri uri, SocketIOOptions options)
4242
{
4343
}
4444

45-
private IEngineIOAdapter NewEnginIOAdapter()
46-
{
47-
if (_options.EIO == EngineIO.V3)
48-
{
49-
return new EngineIO3Adapter();
50-
}
51-
return new EngineIO4Adapter();
52-
}
53-
5445
public Task ConnectAsync()
5546
{
5647
_session = SessionFactory.New(_options.EIO);
@@ -81,12 +72,27 @@ public async Task EmitAsync(string eventName, Action<IAckMessage> ack)
8172
_ackHandlers.Add(PacketId, ack);
8273
}
8374

84-
public void OnNext(IMessage message)
75+
public async Task EmitAsync(string eventName, Func<IAckMessage, Task> ack)
76+
{
77+
ThrowIfNotConnected();
78+
PacketId++;
79+
await _session.SendAsync([eventName], CancellationToken.None);
80+
_funcHandlers.Add(PacketId, ack);
81+
}
82+
83+
public async Task OnNextAsync(IMessage message)
8584
{
8685
if (message.Type == MessageType.Ack)
8786
{
8887
var ackMessage = (IAckMessage)message;
89-
_ackHandlers[ackMessage.Id](ackMessage);
88+
if (_ackHandlers.TryGetValue(ackMessage.Id, out var ack))
89+
{
90+
ack(ackMessage);
91+
}
92+
else if (_funcHandlers.TryGetValue(ackMessage.Id, out var func))
93+
{
94+
await func(ackMessage);
95+
}
9096
}
9197
}
9298
}

tests/SocketIOClient.UnitTests/V2/Protocol/Http/HttpAdapterTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public async Task SendProtocolMessageAsync_WhenCalled_OnNextShouldBeTriggered()
2727

2828
await _httpAdapter.SendAsync(new ProtocolMessage(), CancellationToken.None);
2929

30-
observer.Received().OnNext(Arg.Any<ProtocolMessage>());
30+
observer.Received().OnNextAsync(Arg.Any<ProtocolMessage>());
3131
}
3232

3333
[Fact]
@@ -38,6 +38,6 @@ public async Task SendHttpRequestAsync_WhenCalled_OnNextShouldBeTriggered()
3838

3939
await _httpAdapter.SendAsync(new HttpRequest(), CancellationToken.None);
4040

41-
observer.Received().OnNext(Arg.Any<ProtocolMessage>());
41+
observer.Received().OnNextAsync(Arg.Any<ProtocolMessage>());
4242
}
4343
}

tests/SocketIOClient.UnitTests/V2/Session/HttpSessionTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public async Task Integration_HttpAdapterPushedMessages_MessagesWillBeForwardedT
129129
session.Subscribe(observer);
130130
var captured = new List<IMessage>();
131131
observer
132-
.When(x => x.OnNext(Arg.Any<IMessage>()))
132+
.When(x => x.OnNextAsync(Arg.Any<IMessage>()))
133133
.Do(call => captured.Add(call.Arg<IMessage>()));
134134

135135
await httpAdapter.SendAsync(new ProtocolMessage(), CancellationToken.None);
@@ -164,9 +164,9 @@ public void OnNext_BinaryMessageIsNotReady_NoMessageWillBePushed()
164164
};
165165
_engineIOAdapter.GetMessages(Arg.Any<string>()).Returns([protocolMessage]);
166166

167-
_session.OnNext(protocolMessage);
167+
_session.OnNextAsync(protocolMessage);
168168

169-
observer.Received(0).OnNext(Arg.Any<IMessage>());
169+
observer.Received(0).OnNextAsync(Arg.Any<IMessage>());
170170
_session.PendingDeliveryCount.Should().Be(1);
171171
}
172172

@@ -194,12 +194,12 @@ public void OnNext_BinaryMessageReady_MessageWillBePushed()
194194
Type = ProtocolMessageType.Bytes,
195195
},
196196
]);
197-
_session.OnNext(new ProtocolMessage
197+
_session.OnNextAsync(new ProtocolMessage
198198
{
199199
Type = ProtocolMessageType.Text,
200200
});
201201

202-
observer.Received(1).OnNext(Arg.Any<IBinaryMessage>());
202+
observer.Received(1).OnNextAsync(Arg.Any<IBinaryMessage>());
203203
_session.PendingDeliveryCount.Should().Be(0);
204204
}
205205

@@ -223,12 +223,12 @@ public void OnNext_BinaryAckMessageIsNotReady_NoMessageWillBePushed()
223223
Type = ProtocolMessageType.Text,
224224
},
225225
]);
226-
_session.OnNext(new ProtocolMessage
226+
_session.OnNextAsync(new ProtocolMessage
227227
{
228228
Type = ProtocolMessageType.Text,
229229
});
230230

231-
observer.Received(0).OnNext(Arg.Any<IMessage>());
231+
observer.Received(0).OnNextAsync(Arg.Any<IMessage>());
232232
_session.PendingDeliveryCount.Should().Be(1);
233233
}
234234

@@ -253,16 +253,16 @@ public void OnNext_BinaryAckMessageReady_MessageWillBePushed()
253253
Type = ProtocolMessageType.Text,
254254
},
255255
]);
256-
_session.OnNext(new ProtocolMessage
256+
_session.OnNextAsync(new ProtocolMessage
257257
{
258258
Type = ProtocolMessageType.Text,
259259
});
260-
_session.OnNext(new ProtocolMessage
260+
_session.OnNextAsync(new ProtocolMessage
261261
{
262262
Type = ProtocolMessageType.Bytes,
263263
});
264264

265-
observer.Received(1).OnNext(Arg.Any<IBinaryMessage>());
265+
observer.Received(1).OnNextAsync(Arg.Any<IBinaryMessage>());
266266
_session.PendingDeliveryCount.Should().Be(0);
267267
}
268268
}

tests/SocketIOClient.UnitTests/V2/SocketIOTests.cs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ await _io.Invoking(x => x.EmitAsync("event", _ => { }))
3535
}
3636

3737
[Fact]
38-
public async Task EmitAsync_AckEvent_PacketIdIncrementBy1()
38+
public async Task EmitAsync_AckEventAction_PacketIdIncrementBy1()
3939
{
4040
await _io.ConnectAsync();
4141
await _io.EmitAsync("event", _ => { });
@@ -44,7 +44,7 @@ public async Task EmitAsync_AckEvent_PacketIdIncrementBy1()
4444
}
4545

4646
[Fact]
47-
public async Task EmitAsync_AckEventAndGotResponse_HandlerIsCalled()
47+
public async Task EmitAsync_AckEventActionAndGotResponse_HandlerIsCalled()
4848
{
4949
var ackCalled = false;
5050

@@ -54,7 +54,36 @@ public async Task EmitAsync_AckEventAndGotResponse_HandlerIsCalled()
5454
{
5555
Id = _io.PacketId,
5656
};
57-
_io.OnNext(ackMessage);
57+
_io.OnNextAsync(ackMessage);
58+
59+
ackCalled.Should().BeTrue();
60+
}
61+
62+
[Fact]
63+
public async Task EmitAsync_AckEventFunc_PacketIdIncrementBy1()
64+
{
65+
await _io.ConnectAsync();
66+
await _io.EmitAsync("event", _ => Task.CompletedTask);
67+
68+
_io.PacketId.Should().Be(1);
69+
}
70+
71+
[Fact]
72+
public async Task EmitAsync_AckEventFuncAndGotResponse_HandlerIsCalled()
73+
{
74+
var ackCalled = false;
75+
76+
await _io.ConnectAsync();
77+
await _io.EmitAsync("event", _ =>
78+
{
79+
ackCalled = true;
80+
return Task.CompletedTask;
81+
});
82+
var ackMessage = new SystemJsonAckMessage
83+
{
84+
Id = _io.PacketId,
85+
};
86+
_io.OnNextAsync(ackMessage);
5887

5988
ackCalled.Should().BeTrue();
6089
}

0 commit comments

Comments
 (0)