Skip to content

Commit f67e123

Browse files
DantharAarononthewebArkatufus
authored
Feature/7582 (#7595)
* some comment corrections and persistence config extension * SupervisorStrategyConfigurator integration/usage and comments * doc tweaks * Docs * Config spec and comments * testcase that strat is actually applied * update test to use custom strategy * more docs * syntax highlight * todo removal * Fix DocFX warnings * Fix documentation typos --------- Co-authored-by: Aaron Stannard <[email protected]> Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 49de2de commit f67e123

File tree

8 files changed

+172
-51
lines changed

8 files changed

+172
-51
lines changed

docs/articles/persistence/custom-persistence-provider.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ akka.persistence.journal-plugin-fallback {
217217
# Dispatcher for message replay.
218218
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
219219
220+
# journal supervisor strategy used.
221+
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
222+
# by default it restarts the journal on crash
223+
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
224+
220225
# Default serializer used as manifest serializer when applicable
221226
# and payload serializer when no specific binding overrides are specified
222227
serializer = "json"
@@ -406,6 +411,11 @@ akka.persistence.snapshot-store-plugin-fallback {
406411
# Dispatcher for the plugin actor.
407412
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
408413
414+
# snapshot-store supervisor strategy used.
415+
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
416+
# by default it restarts the snapshot-store on crash
417+
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
418+
409419
# Default serializer used as manifest serializer when applicable
410420
# and payload serializer when no specific binding overrides are specified
411421
serializer = "json"

docs/articles/persistence/storage-plugins.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,49 @@ akka {
3636
}
3737
}
3838
```
39+
40+
### Controlling Journal or Snapshot Crash Behavior
41+
42+
By default the base implementations upon which all journal or snapshot-store implementations are build upon provides out of the box behavior for dealing with errors that occur during the writing or reading of data from the underlying store. Errors that occur will be communicated with the persistent actor that is using them at that time.
43+
So in general once started successfully the journal or snapshot-store will be ready and available for the duration of your application, and won't crash. However in the case they do crash, due to unforeseen circumstances the default behavior is to immediately restart them. This is generally the behavior you want.
44+
But in case you do want to customize how the system handles the crashing of the journal or snapshot-store. You can specify your own supervision strategy using the `supervisor-strategy` property.
45+
This class needs to inherit from `Akka.Actor.SupervisorStrategyConfigurator` and have a parameter-less constructor.
46+
Configuration example:
47+
48+
```hocon
49+
akka {
50+
persistence {
51+
journal {
52+
plugin = "akka.persistence.journal.sqlite"
53+
auto-start-journals = ["akka.persistence.journal.sqlite"]
54+
supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator"
55+
}
56+
snapshot-store {
57+
plugin = "akka.persistence.snapshot-store.sqlite"
58+
auto-start-snapshot-stores = ["akka.persistence.snapshot-store.sqlite"]
59+
supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator"
60+
}
61+
}
62+
}
63+
```
64+
65+
One such case could be to detect and handle misconfigured application settings during startup. For example if your using a SQL based journal and you misconfigured the connection string you might opt to return a supervision strategy that detects certain network connection errors, and after a few retries signals your application to shutdown instead of continue running with a journal or snapshot-store that in all likelihood will never be able to recover, forever stuck in a restart loop while your application is running.
66+
67+
An example of what this could look like is this:
68+
69+
```csharp
70+
71+
public class MyCustomSupervisorConfigurator : SupervisorStrategyConfigurator
72+
{
73+
public override SupervisorStrategy Create()
74+
{
75+
//optionally only stop if the error occurs more then x times in y period
76+
//this will be highly likely if its an unrecoverable error during start/init of the journal/snapshot store
77+
return new OneForOneStrategy(10,TimeSpan.FromSeconds(5),ex =>
78+
{
79+
//detect unrecoverable exception here
80+
return Directive.Stop;
81+
});
82+
}
83+
}
84+
```

src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
// </copyright>
66
//-----------------------------------------------------------------------
77

8+
using System;
9+
using System.Collections.Generic;
810
using Akka.Actor;
11+
using Akka.Actor.Internal;
912
using Akka.Configuration;
1013
using Akka.Persistence.Journal;
1114
using Akka.Persistence.Snapshot;
1215
using Akka.TestKit;
16+
using Akka.Util;
1317
using Xunit;
1418
using Xunit.Abstractions;
1519

@@ -68,6 +72,32 @@ protected internal override bool AroundReceive(Receive receive, object message)
6872
}
6973
}
7074

75+
public class TestSupervisorConfigurator : SupervisorStrategyConfigurator
76+
{
77+
public override SupervisorStrategy Create()
78+
{
79+
return new CustomStrategy(10,TimeSpan.FromSeconds(5),ex =>
80+
{
81+
//detect unrecoverable exception here
82+
return Directive.Stop;
83+
});
84+
}
85+
}
86+
87+
public class CustomStrategy : OneForOneStrategy
88+
{
89+
public CustomStrategy(int? maxNrOfRetries, TimeSpan? withinTimeRange, Func<Exception, Directive> localOnlyDecider) : base(maxNrOfRetries, withinTimeRange, localOnlyDecider)
90+
{
91+
}
92+
93+
public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable<IInternalActorRef> children)
94+
{
95+
//because the journal does not has child actors, the ref is always the actor itself. So optionally do something special here
96+
//to indicate to the system that the journal crashed in an unrecoverable way.
97+
}
98+
}
99+
100+
71101
#endregion
72102

73103
private static readonly string SpecConfig = @"
@@ -82,6 +112,12 @@ class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persist
82112
plugin-dispatcher = ""akka.actor.default-dispatcher""
83113
test-value = ""B""
84114
}
115+
test3 {
116+
class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persistence.Tests""
117+
plugin-dispatcher = ""akka.actor.default-dispatcher""
118+
test-value = ""B""
119+
supervisor-strategy = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestSupervisorConfigurator, Akka.Persistence.Tests""
120+
}
85121
}
86122
akka.persistence.snapshot-store {
87123
test1 {
@@ -100,6 +136,51 @@ public PersistenceConfigSpec(ITestOutputHelper output) : base(SpecConfig, output
100136
{
101137
}
102138

139+
/// <summary>
140+
/// Verify that the journal config contains the expected default from our fallback configs
141+
/// No spec for when the user overrides that because its not the goal to test the hocon config system.
142+
/// Merely that the plugin system here properly applies the fallback config for this config value.
143+
/// </summary>
144+
[Fact]
145+
public void Journal_has_supervision_strategy_configured()
146+
{
147+
var persistence = Persistence.Instance.Apply(Sys);
148+
149+
var config = persistence.JournalConfigFor("akka.persistence.journal.test2");
150+
var defaultstrategy = config.GetString("supervisor-strategy");
151+
defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName);
152+
}
153+
154+
/// <summary>
155+
/// Verify that the snapshot config contains the expected default from our fallback configs
156+
/// No spec for when the user overrides that because its not the goal to test the hocon config system.
157+
/// Merely that the plugin system here properly applies the fallback config for this config value.
158+
/// </summary>
159+
[Fact]
160+
public void Snapshot_has_supervision_strategy_configured()
161+
{
162+
var persistence = Persistence.Instance.Apply(Sys);
163+
164+
var config = persistence.JournalConfigFor("akka.persistence.snapshot-store.test1");
165+
var defaultstrategy = config.GetString("supervisor-strategy");
166+
defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName);
167+
}
168+
169+
[Fact]
170+
public void Journal_has_custom_supervision_strategy_applied()
171+
{
172+
var persistence = Persistence.Instance.Apply(Sys);
173+
var journal = persistence.JournalFor("akka.persistence.journal.test3"); //get our journal with the custom configuration
174+
175+
//waves magic wand
176+
var magicref = journal as ActorRefWithCell;
177+
var appliedStrat = magicref.Underlying.Props.SupervisorStrategy;
178+
//because the configured value for our supervisor strategy is our CustomStrategy
179+
//we verify that the strat returned is the same as currently applied
180+
var customstrategy = new TestSupervisorConfigurator().Create();
181+
appliedStrat.GetType().ShouldBe(customstrategy.GetType());
182+
}
183+
103184
[Fact]
104185
public void Persistence_should_use_inmem_journal_by_default()
105186
{

src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,7 @@ protected sealed override bool Receive(object message)
189189
{
190190
return ReceiveWriteJournal(message) || ReceivePluginInternal(message);
191191
}
192-
193-
/// <summary>
194-
/// TBD
195-
/// </summary>
196-
/// <param name="message">TBD</param>
197-
/// <returns>TBD</returns>
192+
198193
protected bool ReceiveWriteJournal(object message)
199194
{
200195
switch (message)

src/core/Akka.Persistence/Journal/WriteJournal.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
namespace Akka.Persistence.Journal
1515
{
1616
/// <summary>
17-
/// TBD
17+
/// Base class for the journal persistence
1818
/// </summary>
1919
public abstract class WriteJournalBase : ActorBase
2020
{
@@ -30,10 +30,11 @@ protected WriteJournalBase()
3030
}
3131

3232
/// <summary>
33-
/// TBD
33+
/// Creates a sequence of write actions to be executed based on the given messages.
34+
/// Applies any registered EventAdapters to the payloads.
3435
/// </summary>
35-
/// <param name="resequenceables">TBD</param>
36-
/// <returns>TBD</returns>
36+
/// <param name="resequenceables">list of messages to write</param>
37+
/// <returns></returns>
3738
protected IEnumerable<AtomicWrite> PreparePersistentBatch(IEnumerable<IPersistentEnvelope> resequenceables)
3839
{
3940
foreach (var resequenceable in resequenceables)
@@ -53,7 +54,7 @@ protected IEnumerable<AtomicWrite> PreparePersistentBatch(IEnumerable<IPersisten
5354
}
5455

5556
/// <summary>
56-
/// INTERNAL API
57+
/// Apply registered eventadapter to the data payload
5758
/// </summary>
5859
[InternalApi]
5960
protected IEnumerable<IPersistentRepresentation> AdaptFromJournal(IPersistentRepresentation representation)
@@ -65,7 +66,7 @@ protected IEnumerable<IPersistentRepresentation> AdaptFromJournal(IPersistentRep
6566
}
6667

6768
/// <summary>
68-
/// INTERNAL API
69+
/// Apply any registered eventadapter to the data payload
6970
/// </summary>
7071
protected IPersistentRepresentation AdaptToJournal(IPersistentRepresentation representation)
7172
{

src/core/Akka.Persistence/Persistence.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,17 @@ private static IActorRef CreatePlugin(ExtendedActorSystem system, string configP
302302
var pluginType = Type.GetType(pluginTypeName, true);
303303
var pluginDispatcherId = pluginConfig.GetString("plugin-dispatcher", null);
304304
object[] pluginActorArgs = pluginType.GetConstructor(new[] { typeof(Config) }) != null ? new object[] { pluginConfig } : null;
305-
var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId);
306-
305+
306+
//todo wrap in backoffsupervisor ?
307+
308+
//supervisor-strategy is defined by default in the fallback configs. So we always expect to get a value here even if the user has not explicitly defined anything
309+
var configurator = SupervisorStrategyConfigurator.CreateConfigurator(pluginConfig.GetString("supervisor-strategy"));
310+
311+
var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId).WithSupervisorStrategy(configurator.Create());
312+
307313
return system.SystemActorOf(pluginActorProps, pluginActorName);
308314
}
309-
315+
310316
private static EventAdapters CreateAdapters(ExtendedActorSystem system, string configPath)
311317
{
312318
var pluginConfig = system.Settings.Config.GetConfig(configPath);

src/core/Akka.Persistence/persistence.conf

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ akka.persistence {
114114

115115
# Dispatcher for message replay.
116116
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
117-
117+
# journal supervisor strategy used.
118+
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
119+
# by default it restarts the journal on crash
120+
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
121+
118122
# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
119123
serializer = "json"
120124

@@ -173,7 +177,12 @@ akka.persistence {
173177

174178
# Dispatcher for the plugin actor.
175179
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
176-
180+
181+
# snapshot supervisor strategy used.
182+
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
183+
# by default it restarts the snapshot on crash
184+
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
185+
177186
# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
178187
serializer = "json"
179188
circuit-breaker {

src/core/Akka/Actor/SupervisorStrategy.cs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -615,15 +615,7 @@ protected override Directive Handle(IActorRef child, Exception exception)
615615
return Decider.Decide(exception);
616616
}
617617

618-
/// <summary>
619-
/// TBD
620-
/// </summary>
621-
/// <param name="context">TBD</param>
622-
/// <param name="restart">TBD</param>
623-
/// <param name="child">TBD</param>
624-
/// <param name="cause">TBD</param>
625-
/// <param name="stats">TBD</param>
626-
/// <param name="children">TBD</param>
618+
/// <inheritdoc/>
627619
public override void ProcessFailure(IActorContext context, bool restart, IActorRef child, Exception cause, ChildRestartStats stats, IReadOnlyCollection<ChildRestartStats> children)
628620
{
629621
if (children.Count > 0)
@@ -645,12 +637,7 @@ public override void ProcessFailure(IActorContext context, bool restart, IActorR
645637
}
646638
}
647639

648-
/// <summary>
649-
/// TBD
650-
/// </summary>
651-
/// <param name="actorContext">TBD</param>
652-
/// <param name="child">TBD</param>
653-
/// <param name="children">TBD</param>
640+
/// <inheritdoc/>
654641
public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable<IInternalActorRef> children)
655642
{
656643
//Intentionally left blank
@@ -998,14 +985,10 @@ public override int GetHashCode()
998985
}
999986

1000987
/// <summary>
1001-
/// TBD
988+
/// Base configurator class used for configuring the guardian-supervisor-strategy
1002989
/// </summary>
1003990
public abstract class SupervisorStrategyConfigurator
1004991
{
1005-
/// <summary>
1006-
/// TBD
1007-
/// </summary>
1008-
/// <returns>TBD</returns>
1009992
public abstract SupervisorStrategy Create();
1010993

1011994
/// <summary>
@@ -1037,30 +1020,20 @@ public static SupervisorStrategyConfigurator CreateConfigurator(string typeName)
10371020
}
10381021
}
10391022

1040-
/// <summary>
1041-
/// TBD
1042-
/// </summary>
1023+
10431024
public class DefaultSupervisorStrategy : SupervisorStrategyConfigurator
10441025
{
1045-
/// <summary>
1046-
/// TBD
1047-
/// </summary>
1048-
/// <returns>TBD</returns>
1026+
10491027
public override SupervisorStrategy Create()
10501028
{
10511029
return SupervisorStrategy.DefaultStrategy;
10521030
}
10531031
}
10541032

1055-
/// <summary>
1056-
/// TBD
1057-
/// </summary>
1033+
10581034
public class StoppingSupervisorStrategy : SupervisorStrategyConfigurator
10591035
{
1060-
/// <summary>
1061-
/// TBD
1062-
/// </summary>
1063-
/// <returns>TBD</returns>
1036+
10641037
public override SupervisorStrategy Create()
10651038
{
10661039
return SupervisorStrategy.StoppingStrategy;

0 commit comments

Comments
 (0)