Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public override void UnsubscribeAll(Type eventType)
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}

protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
var headers = new Headers
{
Expand All @@ -193,7 +193,7 @@ protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventR
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}

public async override Task PublishFromOutboxAsync(
public override async Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
Expand All @@ -206,13 +206,18 @@ public async override Task PublishFromOutboxAsync(
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}

await PublishAsync(
var result = await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
outgoingEvent.EventName,
outgoingEvent.EventData,
headers
);

if (result.Status != PersistenceStatus.Persisted)
{
throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'.");
}

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -224,7 +229,7 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
}
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
public override async Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
var outgoingEventArray = outgoingEvents.ToArray();
Expand All @@ -242,7 +247,7 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}

producer.Produce(
var result = await producer.ProduceAsync(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Expand All @@ -251,6 +256,11 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
Headers = headers
});

if (result.Status != PersistenceStatus.Persisted)
{
throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'.");
}

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -263,7 +273,7 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
}
}

public async override Task ProcessFromInboxAsync(
public override async Task ProcessFromInboxAsync(
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
Expand All @@ -290,12 +300,16 @@ protected override byte[] Serialize(object eventData)
return Serializer.Serialize(eventData);
}

private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);

return PublishAsync(topicName, eventName, body, headers);
var result = await PublishAsync(topicName, eventName, body, headers);
if (result.Status != PersistenceStatus.Persisted)
{
throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'.");
}
}

private Task<DeliveryResult<string, byte[]>> PublishAsync(
Expand Down
18 changes: 10 additions & 8 deletions framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency
protected ConcurrentDictionary<string, Lazy<IProducer<string, byte[]>>> Producers { get; }

protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);

protected TimeSpan DefaultTransactionsWaitDuration { get; set; } = TimeSpan.FromSeconds(30);

public ILogger<ProducerPool> Logger { get; set; }
Expand All @@ -41,8 +41,10 @@ public virtual IProducer<string, byte[]> Get(string? connectionName = null)
{
var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection).ToDictionary(k => k.Key, v => v.Value));
Options.ConfigureProducer?.Invoke(producerConfig);
producerConfig.Acks ??= Acks.All;
producerConfig.EnableIdempotence ??= true;
return new ProducerBuilder<string, byte[]>(producerConfig).Build();

})).Value;
}

Expand Down Expand Up @@ -70,27 +72,27 @@ public void Dispose()
foreach (var producer in Producers.Values)
{
var poolItemDisposeStopwatch = Stopwatch.StartNew();

try
{
producer.Value.Dispose();
}
catch
{
}

poolItemDisposeStopwatch.Stop();

remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed
? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed)
: TimeSpan.Zero;
}

poolDisposeStopwatch.Stop();

Logger.LogInformation(
$"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms).");

if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0)
{
Logger.LogWarning(
Expand Down
Loading