Skip to content

Commit 39bf725

Browse files
Allow sharding delivery consumer to passivate self (#7670)
* Allow sharding delivery consumer to passivate self * improve code * fix unit test config * Simplify code --------- Co-authored-by: Aaron Stannard <aaron@petabridge.com>
1 parent c444773 commit 39bf725

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="DurableShardingSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Text;
10+
using System.Threading.Tasks;
11+
using Akka.Actor;
12+
using Akka.Actor.Dsl;
13+
using Akka.Cluster.Sharding.Delivery;
14+
using Akka.Configuration;
15+
using Akka.Delivery;
16+
using Akka.Event;
17+
using Akka.Persistence.Delivery;
18+
using Akka.TestKit;
19+
using Xunit;
20+
using Xunit.Abstractions;
21+
using FluentAssertions;
22+
using static Akka.Tests.Delivery.TestConsumer;
23+
24+
namespace Akka.Cluster.Sharding.Tests.Delivery;
25+
26+
public class DurableRememberEntitiesShardingSpec : AkkaSpec
27+
{
28+
private static readonly Config Config =
29+
"""
30+
akka.loglevel = DEBUG
31+
akka.actor.provider = cluster
32+
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
33+
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem"
34+
akka.remote.dot-netty.tcp.port = 0
35+
36+
akka.cluster.sharding.remember-entities = on
37+
akka.cluster.sharding.state-store-mode = ddata
38+
# no leaks between test runs thank you
39+
akka.cluster.sharding.distributed-data.durable.keys = []
40+
akka.cluster.sharding.verbose-debug-logging = on
41+
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
42+
akka.cluster.sharding.entity-restart-backoff = 250ms
43+
""";
44+
45+
public DurableRememberEntitiesShardingSpec(ITestOutputHelper output) : base(Config, output)
46+
{
47+
// TODO: add journal operations subscriptions, once that's properly supported in Akka.Persistence
48+
}
49+
50+
private int _idCount;
51+
52+
private string ProducerId => $"p-{_idCount}";
53+
54+
private int NextId()
55+
{
56+
return _idCount++;
57+
}
58+
59+
private async Task JoinCluster()
60+
{
61+
var cluster = Cluster.Get(Sys);
62+
await cluster.JoinAsync(cluster.SelfAddress);
63+
await AwaitAssertAsync(() => Assert.True(cluster.IsUp));
64+
}
65+
66+
[Fact]
67+
public async Task ReliableDelivery_with_remember_entity_sharding_must_allow_consumer_to_passivate_self_using_Passivate()
68+
{
69+
await JoinCluster();
70+
NextId();
71+
72+
var consumerProbe = CreateTestProbe();
73+
var sharding = await ClusterSharding.Get(system: Sys).StartAsync(
74+
typeName: $"TestConsumer-{_idCount}",
75+
entityPropsFactory: _ => ShardingConsumerController.Create<Job>(
76+
c => Props.Create(() => new Consumer(c, consumerProbe)),
77+
ShardingConsumerController.Settings.Create(Sys)), settings: ClusterShardingSettings.Create(Sys),
78+
messageExtractor: HashCodeMessageExtractor.Create(10, o => string.Empty, o => o));
79+
80+
var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
81+
var shardingProducerController = Sys.ActorOf(
82+
props: ShardingProducerController.Create<Job>(
83+
ProducerId, sharding, durableQueueProps, ShardingProducerController.Settings.Create(Sys)),
84+
name: $"shardingProducerController-{_idCount}");
85+
var producerProbe = CreateTestProbe();
86+
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
87+
88+
var replyProbe = CreateTestProbe();
89+
var next = await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();
90+
next.AskNextTo(
91+
msgWithConfirmation: new ShardingProducerController.MessageWithConfirmation<Job>(EntityId: "entity-1", Message: new Job("ping"),
92+
ReplyTo: replyProbe.Ref));
93+
await replyProbe.ExpectMsgAsync<Done>();
94+
95+
consumerProbe.ExpectMsg("pong");
96+
var entity = consumerProbe.LastSender;
97+
await consumerProbe.WatchAsync(entity);
98+
99+
next = await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();
100+
next.AskNextTo(
101+
msgWithConfirmation: new ShardingProducerController.MessageWithConfirmation<Job>(EntityId: "entity-1", Message: new Job("passivate"),
102+
ReplyTo: replyProbe.Ref));
103+
await replyProbe.ExpectMsgAsync<Done>();
104+
105+
consumerProbe.ExpectMsg("passivate");
106+
await consumerProbe.ExpectTerminatedAsync(entity);
107+
}
108+
109+
private class Consumer : ReceiveActor
110+
{
111+
private readonly IActorRef _consumerController;
112+
public Consumer(IActorRef consumerController, IActorRef consumerProbe)
113+
{
114+
_consumerController = consumerController;
115+
116+
Receive<ConsumerController.Delivery<Job>>(delivery =>
117+
{
118+
Sender.Tell(ConsumerController.Confirmed.Instance);
119+
switch (delivery.Message.Payload)
120+
{
121+
case "stop":
122+
Context.Stop(Self);
123+
break;
124+
case "ping":
125+
consumerProbe.Tell("pong");
126+
break;
127+
case "passivate":
128+
consumerProbe.Tell("passivate");
129+
Context.Parent.Tell(new Passivate(PoisonPill.Instance));
130+
break;
131+
}
132+
});
133+
}
134+
135+
protected override void PreStart()
136+
{
137+
_consumerController.Tell(new ConsumerController.Start<Job>(Self));
138+
}
139+
}
140+
}

src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ private void WaitForStart()
6969
_log.Debug("Consumer terminated before initialized.");
7070
Context.Stop(Self);
7171
});
72+
73+
Receive<Passivate>(_ => Sender.Equals(_consumer), p =>
74+
{
75+
Context.Parent.Tell(p);
76+
});
7277

7378
ReceiveAny(msg =>
7479
{
@@ -141,6 +146,11 @@ private void Active()
141146
}
142147
}
143148
});
149+
150+
Receive<Passivate>(_ => Sender.Equals(_consumer), p =>
151+
{
152+
Context.Parent.Tell(p);
153+
});
144154

145155
ReceiveAny(msg =>
146156
{

0 commit comments

Comments
 (0)