Skip to content

Commit f31b41b

Browse files
authored
[Event Hubs] Buffered Producer Client Infrastructure (enqueue) (#24219)
The focus of these changes is to continue fleshing out the buffered producer client, adding the ability to enqueue events to be published. Pending work: - Background management task - Background publishing task - Flush functionality - Clear functionality - Automatic partition assignment - Partition Key hashing
1 parent c3d77a4 commit f31b41b

File tree

11 files changed

+2171
-94
lines changed

11 files changed

+2171
-94
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
There are any number of "resheader" rows that contain simple
3131
name/value pairs.
3232
33-
Each data row contains a name, and value. The row also contains a
33+
Each data row contains a name, and value. The row also contains a
3434
type or mimetype. Type corresponds to a .NET class that support
3535
text/value conversion through the TypeConverter architecture.
3636
Classes that don't support this are serialized and stored with the
@@ -226,7 +226,7 @@
226226
<value>This information is only available when TrackLastEnqueuedEventProperties is set as part of the options.</value>
227227
</data>
228228
<data name="CannotStartEventProcessorWithoutHandler" xml:space="preserve">
229-
<value>Cannot begin processing without {0} handler set.</value>
229+
<value>Cannot begin processing without the {0} handler set.</value>
230230
</data>
231231
<data name="RunningEventProcessorCannotPerformOperation" xml:space="preserve">
232232
<value>The event processor is already running and needs to be stopped in order to perform this operation.</value>
@@ -321,4 +321,7 @@
321321
<data name="ProcessorLoadBalancingCycleSlowMask" xml:space="preserve">
322322
<value>"A load balancing cycle has taken too long to complete. A slow cycle can cause stability issues with partition ownership. Consider investigating storage latency and thread pool health. Common causes are soft delete being enabled for storage and too many partitions owned. You may also want to consider increasing the 'PartitionOwnershipExpirationInterval' in the processor options. Cycle Duration: '{0:0.00}' seconds. Partition Ownership Interval '{1:0:00}' seconds.")]</value>
323323
</data>
324+
<data name="CannotEnqueueEventWithoutHandler" xml:space="preserve">
325+
<value>Events cannot be enqueued processing without the {0} handler set.</value>
326+
</data>
324327
</root>

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpAnnotatedMessageExtensions.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,19 @@ public static DateTimeOffset GetEnqueuedTime(this AmqpAnnotatedMessage instance,
221221
return defaultValue;
222222
}
223223

224+
/// <summary>
225+
/// Sets the time that an event was enqueued in the partition on an <see cref="AmqpAnnotatedMessage" />.
226+
/// </summary>
227+
///
228+
/// <param name="instance">The instance that this method was invoked on.</param>
229+
/// <param name="enqueueTime">The value to set as the enqueue time.</param>
230+
///
231+
public static void SetEnqueuedTime(this AmqpAnnotatedMessage instance,
232+
DateTimeOffset enqueueTime)
233+
{
234+
instance.MessageAnnotations[AmqpProperty.EnqueuedTime.ToString()] = enqueueTime;
235+
}
236+
224237
/// <summary>
225238
/// Retrieves the partition key of an event from an <see cref="AmqpAnnotatedMessage" />.
226239
/// </summary>
@@ -242,6 +255,19 @@ public static string GetPartitionKey(this AmqpAnnotatedMessage instance,
242255
return defaultValue;
243256
}
244257

258+
/// <summary>
259+
/// Sets the partition key of an event on an <see cref="AmqpAnnotatedMessage" />.
260+
/// </summary>
261+
///
262+
/// <param name="instance">The instance that this method was invoked on.</param>
263+
/// <param name="partitionKey">The value to set for the partition key.</param>
264+
///
265+
public static void SetPartitionKey(this AmqpAnnotatedMessage instance,
266+
string partitionKey)
267+
{
268+
instance.MessageAnnotations[AmqpProperty.PartitionKey.ToString()] = partitionKey;
269+
}
270+
245271
/// <summary>
246272
/// Retrieves the sequence number of the last event published to the partition from an <see cref="AmqpAnnotatedMessage" />.
247273
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,88 @@ public virtual void BufferedProducerManagementTaskError(string identifier,
15111511
}
15121512
}
15131513

1514+
/// <summary>
1515+
/// Indicates that the enqueue of events for publishing has started.
1516+
/// </summary>
1517+
///
1518+
/// <param name="eventHubName">The name of the Event Hub being published to.</param>
1519+
/// <param name="partitionIdOrKey">The identifier of a partition or the partition hash key used for publishing; identifier or key.</param>
1520+
/// <param name="operationId">An artificial identifier for the publishing operation.</param>
1521+
///
1522+
[Event(77, Level = EventLevel.Informational, Message = "Enqueuing events for publishing to Event Hub: {0} (Partition Id/Key: '{1}'), Operation Id: '{2}'.")]
1523+
public virtual void EventEnqueueStart(string eventHubName,
1524+
string partitionIdOrKey,
1525+
string operationId)
1526+
{
1527+
if (IsEnabled())
1528+
{
1529+
WriteEvent(77, eventHubName ?? string.Empty, partitionIdOrKey ?? string.Empty, operationId ?? string.Empty);
1530+
}
1531+
}
1532+
1533+
/// <summary>
1534+
/// Indicates that the enqueue of events for publishing has completed.
1535+
/// </summary>
1536+
///
1537+
/// <param name="eventHubName">The name of the Event Hub being published to.</param>
1538+
/// <param name="partitionIdOrKey">The identifier of a partition or the partition hash key requested when enqueuing the event; identifier or key.</param>
1539+
/// <param name="operationId">An artificial identifier for the publishing operation.</param>
1540+
///
1541+
[Event(78, Level = EventLevel.Informational, Message = "Completed enqueuing events for publishing to Event Hub: {0} (Requested Partition Id/Key: '{1}'), Operation Id: '{2}'.")]
1542+
public virtual void EventEnqueueComplete(string eventHubName,
1543+
string partitionIdOrKey,
1544+
string operationId)
1545+
{
1546+
if (IsEnabled())
1547+
{
1548+
WriteEvent(78, eventHubName ?? string.Empty, partitionIdOrKey ?? string.Empty, operationId ?? string.Empty);
1549+
}
1550+
}
1551+
1552+
/// <summary>
1553+
/// Indicates that an exception was encountered while enqueuing of events for publishing.
1554+
/// </summary>
1555+
///
1556+
/// <param name="eventHubName">The name of the Event Hub being published to.</param>
1557+
/// <param name="partitionIdOrKey">The identifier of a partition or the partition hash key requested when enqueuing the event; identifier or key.</param>
1558+
/// <param name="operationId">An artificial identifier for the publishing operation.</param>
1559+
/// <param name="errorMessage">The message for the exception that occurred.</param>
1560+
///
1561+
[Event(79, Level = EventLevel.Error, Message = "An exception occurred while enqueuing events for publishing to Event Hub: {0} (Requested Partition Id/Key: '{1}'), Operation Id: '{2}'. Error Message: '{3}'")]
1562+
public virtual void EventEnqueueError(string eventHubName,
1563+
string partitionIdOrKey,
1564+
string operationId,
1565+
string errorMessage)
1566+
{
1567+
if (IsEnabled())
1568+
{
1569+
WriteEvent(79, eventHubName ?? string.Empty, partitionIdOrKey ?? string.Empty, operationId ?? string.Empty, errorMessage ?? string.Empty);
1570+
}
1571+
}
1572+
1573+
/// <summary>
1574+
/// Indicates that an event has been assigned a partition as part of enqueuing it to be published has completed.
1575+
/// </summary>
1576+
///
1577+
/// <param name="eventHubName">The name of the Event Hub being published to.</param>
1578+
/// <param name="requestedPartitionIdOrKey">The identifier of a partition or the partition hash key requested when enqueuing the event; identifier or key.</param>
1579+
/// <param name="assignedPartitionId">The identifier of the partition to which the event was assigned.</param>
1580+
/// <param name="operationId">An artificial identifier for the publishing operation.</param>
1581+
/// <param name="totalBufferedEventCount">The total number of buffered events at the time the enqueue was observed.</param>
1582+
///
1583+
[Event(80, Level = EventLevel.Verbose, Message = "An event being enqueued for publishing to Event Hub: {0} (Requested Partition Id/Key: '{1}') for Operation Id: '{2}' has been enqueued for Partition Id: '{3}'. Total Buffered Event Count: {4}.")]
1584+
public virtual void EventEnqueued(string eventHubName,
1585+
string requestedPartitionIdOrKey,
1586+
string assignedPartitionId,
1587+
string operationId,
1588+
int totalBufferedEventCount)
1589+
{
1590+
if (IsEnabled())
1591+
{
1592+
WriteEvent(80, eventHubName ?? string.Empty, requestedPartitionIdOrKey ?? string.Empty, operationId ?? string.Empty, assignedPartitionId ?? string.Empty, totalBufferedEventCount);
1593+
}
1594+
}
1595+
15141596
/// <summary>
15151597
/// Indicates that an exception was encountered in an unexpected code path, not directly associated with
15161598
/// an Event Hubs operation.

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EnqueueEventOptions.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,38 @@ internal class EnqueueEventOptions : SendEventOptions
4040
///
4141
[EditorBrowsable(EditorBrowsableState.Never)]
4242
public override string ToString() => base.ToString();
43+
44+
/// <summary>
45+
/// Deconstructs the instance into its component attributes.
46+
/// </summary>
47+
///
48+
/// <param name="partitionId">The partition identifier specified by the options.</param>
49+
/// <param name="partitionKey">The partition key specified by the options.</param>
50+
///
51+
internal void Deconstruct(out string partitionId,
52+
out string partitionKey)
53+
{
54+
partitionId = PartitionId;
55+
partitionKey = PartitionKey;
56+
}
57+
58+
/// <summary>
59+
/// The set of default attributes for the options, intended to be
60+
/// used as alternative to <see cref="Deconstruct" /> when no options
61+
/// were specified.
62+
/// </summary>
63+
///
64+
/// <returns>A tuple of the default values for the options attributes.</returns>
65+
///
66+
internal static (string PartitionId, string PartitionKey) DeconstructOrUseDefaultAttributes(EnqueueEventOptions options = default)
67+
{
68+
if (options != null)
69+
{
70+
(var partitionId, var partitionKey) = options;
71+
return (partitionId, partitionKey);
72+
}
73+
74+
return (null, null);
75+
}
4376
}
4477
}

0 commit comments

Comments
 (0)