Skip to content

Commit 191d028

Browse files
[ServiceBus] No longer create a copy of the array in the message converter for the default and stream case (Azure#29946)
* No longer create a copy of the array in the message converter for the default case and when handling streams * Apply same principles to StreamToBytes * Remove unneeded method * Align StreamToBytes * Collapse stream processing * Add proposal for the changelog
1 parent 576c4dc commit 191d028

File tree

2 files changed

+19
-38
lines changed

2 files changed

+19
-38
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
### Other Changes
1212

13+
- Reduced memory allocations when converting messages into the underlying AMQP primitives. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
14+
1315
## 7.9.0 (2022-07-11)
1416

1517
### Features Added

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -145,28 +145,34 @@ private static AmqpMessage BuildAmqpBatchFromMessages(
145145
///
146146
private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)
147147
{
148-
if (stream == null)
149-
{
150-
return new ArraySegment<byte>();
151-
}
152-
153148
switch (stream)
154149
{
150+
case { Length: < 1 }:
151+
return default;
152+
155153
case BufferListStream bufferListStream:
156154
return bufferListStream.ReadBytes((int)stream.Length);
157155

158156
case MemoryStream memStreamSource:
159157
{
160158
using var memStreamCopy = new MemoryStream((int)(memStreamSource.Length - memStreamSource.Position));
161159
memStreamSource.CopyTo(memStreamCopy, StreamBufferSizeInBytes);
162-
return new ArraySegment<byte>(memStreamCopy.ToArray());
160+
if (!memStreamCopy.TryGetBuffer(out ArraySegment<byte> segment))
161+
{
162+
segment = new ArraySegment<byte>(memStreamCopy.ToArray());
163+
}
164+
return segment;
163165
}
164166

165167
default:
166168
{
167-
using var memStream = new MemoryStream(StreamBufferSizeInBytes);
168-
stream.CopyTo(memStream, StreamBufferSizeInBytes);
169-
return new ArraySegment<byte>(memStream.ToArray());
169+
using var memStreamCopy = new MemoryStream(StreamBufferSizeInBytes);
170+
stream.CopyTo(memStreamCopy, StreamBufferSizeInBytes);
171+
if (!memStreamCopy.TryGetBuffer(out ArraySegment<byte> segment))
172+
{
173+
segment = new ArraySegment<byte>(memStreamCopy.ToArray());
174+
}
175+
return segment;
170176
}
171177
}
172178
}
@@ -689,7 +695,7 @@ internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType
689695
case PropertyValueType.Stream:
690696
if (mappingType == MappingType.ApplicationProperty)
691697
{
692-
amqpObject = StreamToBytes((Stream)netObject);
698+
amqpObject = ReadStreamToArraySegment((Stream)netObject);
693699
}
694700
break;
695701
case PropertyValueType.Uri:
@@ -706,7 +712,7 @@ internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType
706712
{
707713
if (mappingType == MappingType.ApplicationProperty)
708714
{
709-
amqpObject = StreamToBytes(netObjectAsStream);
715+
amqpObject = ReadStreamToArraySegment(netObjectAsStream);
710716
}
711717
}
712718
else if (mappingType == MappingType.ApplicationProperty)
@@ -822,33 +828,6 @@ private static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType
822828
return netObject != null;
823829
}
824830

825-
private static ArraySegment<byte> StreamToBytes(Stream stream)
826-
{
827-
ArraySegment<byte> buffer;
828-
if (stream == null || stream.Length < 1)
829-
{
830-
buffer = default;
831-
}
832-
else
833-
{
834-
using (var memoryStream = new MemoryStream(512))
835-
{
836-
stream.CopyTo(memoryStream, 512);
837-
buffer = new ArraySegment<byte>(memoryStream.ToArray());
838-
}
839-
}
840-
841-
return buffer;
842-
}
843-
844-
private static Data ToData(AmqpMessage message)
845-
{
846-
ArraySegment<byte>[] payload = message.GetPayload();
847-
var buffer = new BufferListStream(payload);
848-
ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);
849-
return new Data { Value = value };
850-
}
851-
852831
internal static AmqpMap GetSqlRuleFilterMap(SqlRuleFilter sqlRuleFilter)
853832
{
854833
var amqpFilterMap = new AmqpMap

0 commit comments

Comments
 (0)