Skip to content

Commit 28f5d91

Browse files
authored
[WebPubSub]Implement WebSocket Client (Azure#32478)
# Contributing to the Azure SDK Please see our [CONTRIBUTING.md](https://github.com/Azure/azure-sdk-for-net/blob/main/CONTRIBUTING.md) if you are not familiar with contributing to this repository or have questions. For specific information about pull request etiquette and best practices, see [this section](https://github.com/Azure/azure-sdk-for-net/blob/main/CONTRIBUTING.md#pull-request-etiquette-and-best-practices).
1 parent 2cbcfd0 commit 28f5d91

13 files changed

+352
-64
lines changed

sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Azure.Messaging.WebPubSub.Client.sln

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ EndProject
88
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.WebPubSub.Client.Tests", "tests\Azure.Messaging.WebPubSub.Client.Tests.csproj", "{0B6AAD06-F541-4931-A661-16BE3BE24EEF}"
99
EndProject
1010
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{E5726134-6AA7-4AED-A654-53C92F09485A}"
11-
ProjectSection(SolutionItems) = preProject
12-
tests\test.runsettings = tests\test.runsettings
13-
EndProjectSection
1411
EndProject
1512
Global
1613
GlobalSection(SolutionConfigurationPlatforms) = preSolution

sdk/webpubsub/Azure.Messaging.WebPubSub.Client/api/Azure.Messaging.WebPubSub.Client.netstandard2.1.cs renamed to sdk/webpubsub/Azure.Messaging.WebPubSub.Client/api/Azure.Messaging.WebPubSub.Client.netstandard2.0.cs

File renamed without changes.

sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Azure.Messaging.WebPubSub.Client.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
<PackageTags>azure;webpubsub.client</PackageTags>
77
<NoWarn>$(NoWarn);0067</NoWarn>
88
<IncludeOperationsSharedSource>true</IncludeOperationsSharedSource>
9-
<RequiredTargetFrameworks>netstandard2.1</RequiredTargetFrameworks>
109
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
1110
</PropertyGroup>
1211
<ItemGroup>
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics.Tracing;
7+
using System.Text;
8+
using Azure.Core.Diagnostics;
9+
10+
namespace Azure.Messaging.WebPubSub.Clients
11+
{
12+
[EventSource(Name = EventSourceName)]
13+
internal class WebPubSubClientEventSource: AzureEventSource
14+
{
15+
private const string EventSourceName = "Azure-Messaging-WebPubSub-Client";
16+
17+
private WebPubSubClientEventSource() : base(EventSourceName)
18+
{
19+
}
20+
21+
// Having event ids defined as const makes it easy to keep track of them
22+
private const int ClientStartingId = 1;
23+
private const int ClientStateChangedId = 2;
24+
private const int WebSocketConnectingId = 3;
25+
private const int WebSocketClosedId = 4;
26+
private const int FailedToProcessMessageId = 5;
27+
private const int FailedToReceiveBytesId = 6;
28+
private const int FailedToChangeClientStateId = 7;
29+
private const int StopRecoveryId = 8;
30+
private const int FailedToRecoverConnectionId = 9;
31+
private const int FailedToInvokeEventId = 10;
32+
private const int ConnectionConnectedId = 11;
33+
private const int ConnectionDisconnectedId = 12;
34+
private const int FailedToReconnectId = 13;
35+
36+
public static WebPubSubClientEventSource Log { get; } = new WebPubSubClientEventSource();
37+
38+
[Event(1, Level = EventLevel.Informational, Message = "Client is starting.")]
39+
public virtual void ClientStarting()
40+
{
41+
if (IsEnabled())
42+
{
43+
WriteEvent(ClientStartingId);
44+
}
45+
}
46+
47+
[Event(2, Level = EventLevel.Verbose, Message = "The client state changed from the '{0}' state to the '{1}' state.")]
48+
public virtual void ClientStateChanged(string newState, string currentState)
49+
{
50+
if (IsEnabled())
51+
{
52+
WriteEvent(ClientStateChangedId, currentState, newState);
53+
}
54+
}
55+
56+
[Event(3, Level = EventLevel.Verbose, Message = "A new WebSocket connection is starting to connect with subprotocol {0}.")]
57+
public virtual void WebSocketConnecting(string subprotocol)
58+
{
59+
if (IsEnabled())
60+
{
61+
WriteEvent(WebSocketConnectingId, subprotocol);
62+
}
63+
}
64+
65+
[Event(4, Level = EventLevel.Verbose, Message = "WebSocket connection is closed.")]
66+
public virtual void WebSocketClosed()
67+
{
68+
if (IsEnabled())
69+
{
70+
WriteEvent(WebSocketClosedId);
71+
}
72+
}
73+
74+
[Event(5, Level = EventLevel.Warning, Message = "An exception occurred while processing message from the service. Error Message: {0}")]
75+
public virtual void FailedToProcessMessage(string errorMessage)
76+
{
77+
if (IsEnabled())
78+
{
79+
WriteEvent(FailedToProcessMessageId, errorMessage);
80+
}
81+
}
82+
83+
[Event(6, Level = EventLevel.Informational, Message = "An exception occurred while receiving bytes. Error Message: {0}")]
84+
public virtual void FailedToReceiveBytes(string errorMessage)
85+
{
86+
if (IsEnabled())
87+
{
88+
WriteEvent(FailedToReceiveBytesId, errorMessage);
89+
}
90+
}
91+
92+
[Event(7, Level = EventLevel.Warning, Message = "The client failed to change from the '{0}' state to the '{1}' state because it was actually in the '{2}' state.")]
93+
public virtual void FailedToChangeClientState(string expectedState, string newState, string currentState)
94+
{
95+
if (IsEnabled())
96+
{
97+
WriteEvent(FailedToChangeClientStateId, expectedState, newState, currentState);
98+
}
99+
}
100+
101+
[Event(8, Level = EventLevel.Warning, Message = "Stop try to recover the connection with connection ID: {0} cause of {1}.")]
102+
public virtual void StopRecovery(string connectionId, string reason)
103+
{
104+
if (IsEnabled())
105+
{
106+
WriteEvent(StopRecoveryId, connectionId, reason);
107+
}
108+
}
109+
110+
[Event(9, Level = EventLevel.Informational, Message = "An attempt to recover connection with connection ID: {0} failed, will retry later. Error Message: {1}")]
111+
public virtual void FailedToRecoverConnection(string connectionId, string errorMessage)
112+
{
113+
if (IsEnabled())
114+
{
115+
WriteEvent(FailedToRecoverConnectionId, connectionId, errorMessage);
116+
}
117+
}
118+
119+
[Event(10, Level = EventLevel.Warning, Message = "An exception occurred while invoking event handler {0}. Error Message: {1}")]
120+
public virtual void FailedToInvokeEvent(string eventName, string errorMessage)
121+
{
122+
if (IsEnabled())
123+
{
124+
WriteEvent(FailedToInvokeEventId, eventName, errorMessage);
125+
}
126+
}
127+
128+
[Event(11, Level = EventLevel.Informational, Message = "Connection with connection ID: {0} is connected.")]
129+
public virtual void ConnectionConnected(string connectionId)
130+
{
131+
if (IsEnabled())
132+
{
133+
WriteEvent(ConnectionConnectedId, connectionId);
134+
}
135+
}
136+
137+
[Event(12, Level = EventLevel.Informational, Message = "Connection with connection ID: {0} is disconnected.")]
138+
public virtual void ConnectionDisconnected(string connectionId)
139+
{
140+
if (IsEnabled())
141+
{
142+
WriteEvent(ConnectionDisconnectedId, connectionId);
143+
}
144+
}
145+
146+
[Event(13, Level = EventLevel.Warning, Message = "An attempt to reconnect connection who is the successor of connection ID: {0} failed, will retry later. Error Message: {1}")]
147+
public virtual void FailedToReconnect(string connectionId, string errorMessage)
148+
{
149+
if (IsEnabled())
150+
{
151+
WriteEvent(FailedToReconnectId, connectionId, errorMessage);
152+
}
153+
}
154+
}
155+
}

sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/MemoryBufferWriter.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,10 @@ private async Task CopyToSlowAsync(Stream destination)
197197
for (var i = 0; i < count; i++)
198198
{
199199
var segment = _completedSegments[i];
200-
await destination.WriteAsync(segment.Buffer.AsMemory(0, segment.Length), CancellationToken.None).ConfigureAwait(false);
200+
await destination.WriteAsync(segment.Buffer, 0, segment.Length, CancellationToken.None).ConfigureAwait(false);
201201
}
202202
}
203-
204-
await destination.WriteAsync(_currentSegment.AsMemory(0, _position)).ConfigureAwait(false);
203+
await destination.WriteAsync(_currentSegment, 0, _position).ConfigureAwait(false);
205204
}
206205

207206
public byte[] ToArray()

sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ private static void WriteData(IBufferWriter<byte> buffer, Utf8JsonWriter writer,
429429
var span = buffer.GetSpan(length);
430430
span[0] = ListSeparator;
431431
span[1] = Quote;
432-
DataPropertyNameBytes.EncodedUtf8Bytes.CopyTo(span[2..]);
432+
DataPropertyNameBytes.EncodedUtf8Bytes.CopyTo(span.Slice(2));
433433
span[length - 2] = Quote;
434434
span[length - 1] = KeyValueSeperator;
435435
buffer.Advance(length);
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Net.WebSockets;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Azure.Core.Pipeline;
10+
11+
namespace Azure.Messaging.WebPubSub.Clients
12+
{
13+
internal sealed partial class WebSocketClient : IWebSocketClient
14+
{
15+
private readonly ClientWebSocket _socket;
16+
private readonly Uri _uri;
17+
private readonly string _protocol;
18+
private readonly MemoryBufferWriter _buffer;
19+
20+
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1);
21+
22+
public WebSocketClient(Uri uri, string protocol)
23+
{
24+
_protocol = protocol;
25+
_socket = new ClientWebSocket();
26+
_socket.Options.AddSubProtocol(_protocol);
27+
_uri = uri;
28+
_buffer = new MemoryBufferWriter();
29+
}
30+
31+
public void Dispose()
32+
{
33+
_sendLock.Dispose();
34+
_socket.Dispose();
35+
_buffer.Dispose();
36+
}
37+
38+
public async Task ConnectAsync(CancellationToken token)
39+
{
40+
WebPubSubClientEventSource.Log.WebSocketConnecting(_protocol);
41+
42+
await _socket.ConnectAsync(_uri, token).ConfigureAwait(false);
43+
}
44+
45+
public async Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
46+
{
47+
await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
48+
try
49+
{
50+
await _socket.SendAsync(new ArraySegment<byte>(buffer.ToArray()) , messageType, endOfMessage, cancellationToken).ConfigureAwait(false);
51+
}
52+
finally
53+
{
54+
_sendLock.Release();
55+
}
56+
}
57+
58+
public async Task<WebSocketReadResult> ReceiveOneFrameAsync(CancellationToken token)
59+
{
60+
token.ThrowIfCancellationRequested();
61+
62+
if (_socket.State == WebSocketState.Closed)
63+
{
64+
return new WebSocketReadResult(default, true);
65+
}
66+
67+
_buffer.Reset();
68+
var type = await ReceiveOneFrameAsync(_buffer, _socket, token).ConfigureAwait(false);
69+
if (type == WebSocketMessageType.Close)
70+
{
71+
if (_socket.State == WebSocketState.CloseReceived)
72+
{
73+
try
74+
{
75+
await _socket.CloseOutputAsync(_socket.CloseStatus ?? WebSocketCloseStatus.EndpointUnavailable, null, default).ConfigureAwait(false);
76+
}
77+
catch { }
78+
}
79+
80+
return new WebSocketReadResult(default, true, _socket.CloseStatus);
81+
}
82+
83+
return new WebSocketReadResult(_buffer.AsReadOnlySequence());
84+
}
85+
86+
public async Task StopAsync(CancellationToken token)
87+
{
88+
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, token).ConfigureAwait(false);
89+
}
90+
91+
internal void Abort()
92+
{
93+
_socket.Abort();
94+
}
95+
96+
private static async Task<WebSocketMessageType> ReceiveOneFrameAsync(IBufferWriter<byte> buffer, WebSocket socket, CancellationToken token)
97+
{
98+
if (token.IsCancellationRequested)
99+
{
100+
throw new OperationCanceledException();
101+
}
102+
103+
var memory = buffer.GetMemory();
104+
var receiveResult = await ReadSocketAsync(socket, memory, token).ConfigureAwait(false);
105+
106+
if (receiveResult.MessageType == WebSocketMessageType.Close)
107+
{
108+
return WebSocketMessageType.Close;
109+
}
110+
111+
buffer.Advance(receiveResult.Count);
112+
113+
while (!receiveResult.EndOfMessage)
114+
{
115+
memory = buffer.GetMemory();
116+
receiveResult = await ReadSocketAsync(socket, memory, token).ConfigureAwait(false);
117+
118+
// Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read
119+
if (receiveResult.MessageType == WebSocketMessageType.Close)
120+
{
121+
return WebSocketMessageType.Close;
122+
}
123+
124+
buffer.Advance(receiveResult.Count);
125+
}
126+
127+
return receiveResult.MessageType;
128+
}
129+
130+
private static async Task<WebSocketReceiveResult> ReadSocketAsync(WebSocket socket, Memory<byte> destination, CancellationToken token)
131+
{
132+
var array = new ArraySegment<byte>(new byte[destination.Length]);
133+
var receiveResult = await socket.ReceiveAsync(array, token).ConfigureAwait(false);
134+
array.Array.CopyTo(destination);
135+
return receiveResult;
136+
}
137+
}
138+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Text;
7+
8+
namespace Azure.Messaging.WebPubSub.Clients
9+
{
10+
internal class WebSocketClientFactory: IWebSocketClientFactory
11+
{
12+
public IWebSocketClient CreateWebSocketClient(Uri uri, string protocol)
13+
{
14+
return new WebSocketClient(uri, protocol);
15+
}
16+
}
17+
}
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
4-
<RunSettingsFilePath>$(MSBuildProjectDirectory)\test.runsettings</RunSettingsFilePath>
54
</PropertyGroup>
65
<ItemGroup>
7-
<PackageReference Include="xunit" />
8-
<PackageReference Include="xunit.runner.visualstudio" />
6+
<PackageReference Include="NUnit" />
7+
<PackageReference Include="NUnit3TestAdapter" />
98
<PackageReference Include="Moq" />
109
<PackageReference Include="Microsoft.NET.Test.Sdk" />
1110
<PackageReference Include="Azure.Messaging.WebPubSub" />
1211
</ItemGroup>
1312

14-
<ItemGroup Condition="$(TargetFramework) != 'net461'">
13+
<ItemGroup>
1514
<ProjectReference Include="..\src\Azure.Messaging.WebPubSub.Client.csproj" />
1615
</ItemGroup>
1716
</Project>

0 commit comments

Comments
 (0)