Skip to content

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented May 6, 2025

Changes

Introduce CancellationToken to WithCircuitBreaker

The changes to CircuitBreaker is backward compatible, but the changes to Akka.Persistence is not, because these methods need to follow the new changes of the delegate function.

Warning

Breaking Change

Because the change was done to the delegate method fingerprint, it is impossible to make this change backward compatible.

All plugins that is based on Akka.Persistence will need to be updated to use these new Akka.Persistence internal APIs

  • Akka.Persistence.Sql
  • Akka.Persistence.MongoDb
  • Akka.Persistence.Azure
  • Akka.Persistence.Redis

@Arkatufus Arkatufus marked this pull request as draft May 7, 2025 14:37
@Aaronontheweb
Copy link
Member

I checked the build warning counts to make sure that the obsoletions introduced in this PR don't create new warnings:

  • dev: 233
  • this PR: 233

So we're good to go there.

@Arkatufus Arkatufus marked this pull request as ready for review May 7, 2025 16:07
Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

/// <returns>TBD</returns>
Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr);
Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -88,7 +90,7 @@ protected AsyncWriteJournal()
public abstract Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback);

/// <inheritdoc/>
public abstract Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr);
public abstract Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/// <returns>TBD</returns>
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages);
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/// <returns>TBD</returns>
protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr);
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +222 to +223
await _breaker.WithCircuitBreaker((message, awj: this), (state, ct) =>
state.awj.DeleteMessagesToAsync(state.message.PersistenceId, state.message.ToSequenceNr, ct));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of how CircuitBreaker will pass in its CancellationToken to the method it is protecting as a cancellation signal.

Comment on lines +242 to +245
protected abstract Task<SelectedSnapshot> LoadAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Comment on lines +254 to +258
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task SaveAsync(
SnapshotMetadata metadata,
object snapshot,
CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

/// <returns>TBD</returns>
protected abstract Task DeleteAsync(SnapshotMetadata metadata);
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

Comment on lines +277 to +280
protected abstract Task DeleteAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Breaking change] Akka.Persistence codes that are being wrapped inside a CircuitBreaker will now take a CancellationToken as a signal that the operation has been timed out by the CircuitBreaker.

/// <typeparam name="T">The <see cref="Type"/> returned by the protected function</typeparam>
/// <param name="body">Call needing protected</param>
/// <returns><see cref="Task"/> containing the call result</returns>
public Task<T> WithCircuitBreaker<T>(Func<CancellationToken, Task<T>> body) => CurrentState.Invoke(body);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All .WithCircuitBreaker() methods now accepts delegate function with a CancellationToken as one of its arguments, this CancellationToken will be used to pass in cancellation signal downstream into the protected delegate function if the CircuitBreaker throws.

CallSucceeds();
}
catch (Exception ex)
{
cts.Cancel(); // Signal the protected delegate that operation has been canceled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancellation signal is sent to the underlying protected function here.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunate but necessary breaking change IMHO

/// <returns>TBD</returns>
Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr);
Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

config.GetTimeSpan("circuit-breaker.reset-timeout", null));
config.GetInt("circuit-breaker.max-failures", 10),
config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)),
config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems high IMHO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can address that in a later issue if it is a problem - in fact I think 30s is the current default, isn't it?

@@ -88,7 +90,7 @@ protected AsyncWriteJournal()
public abstract Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback);

/// <inheritdoc/>
public abstract Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr);
public abstract Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/// <returns>TBD</returns>
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages);
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/// <returns>TBD</returns>
protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr);
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -59,155 +61,166 @@ private bool ReceiveSnapshotStore(object message)
var senderPersistentActor = Sender; // Sender is PersistentActor
var self = Self; //Self MUST BE CLOSED OVER here, or the code below will be subject to race conditions

if (message is LoadSnapshot loadSnapshot)
switch (message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb
Copy link
Member

@Arkatufus just to validate that this design is going to fly with real plugins in the real world, can you do a fork using a local / testlab build of Akka.NET v1.5.42 and integrate it into something like Akka.Persistence.Sql or MongoDb?

We just want to validate that these changes can actually permeate down to the database drivers and do what we expect / hope (i.e. roll back a timed-out TXN in Akka.Persistence.Sql, for instance.)

No need for crazy unit or integration tests to validate this - that would be a gigantic waste of time. A quick "let's run a sample application with this change integrated into a real plugin and see what happens" would be fine.

@Arkatufus
Copy link
Contributor Author

Will do

@Aaronontheweb
Copy link
Member

Validated POC here akkadotnet/Akka.Persistence.Sql#535

@Aaronontheweb Aaronontheweb merged commit 0b91a04 into akkadotnet:dev May 12, 2025
11 checks passed
@Arkatufus Arkatufus added this to the 1.5.42 milestone May 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants