Skip to content

Implement run retry policy #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ on:
description: name of the test artifact output

jobs:

sdk-test-suite:
if: github.repository_owner == 'restatedev'
runs-on: ubuntu-latest
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
strategy:
matrix:
sdk-test-suite: [ "1.8" ]
sdk-test-suite: [ "2.0" ]
permissions:
contents: read
issues: read
Expand Down
2 changes: 2 additions & 0 deletions buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Generated by buf. DO NOT EDIT.
version: v2
8 changes: 8 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: v2
name: buf.build/restatedev/service-protocol
lint:
use:
- DEFAULT
breaking:
use:
- FILE
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.suspendCancellableCoroutine

internal class ContextImpl internal constructor(private val syscalls: Syscalls) : WorkflowContext {
internal class ContextImpl internal constructor(internal val syscalls: Syscalls) : WorkflowContext {
override fun key(): String {
return this.syscalls.objectKey()
}
Expand Down Expand Up @@ -168,14 +168,11 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

var actionReturnValue: T? = null
var actionFailure: TerminalException? = null
var actionFailure: Throwable? = null
try {
actionReturnValue = block()
} catch (e: TerminalException) {
actionFailure = e
} catch (t: Throwable) {
syscalls.fail(t)
throw CancellationException("Side effect failure", t)
actionFailure = t
}

val exitCallback =
Expand All @@ -189,12 +186,92 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override fun onCancel(t: Throwable?) {
exitResult.cancel(CancellationException("Suspended", t))
exitResult.cancel(CancellationException(message = null, cause = t))
}
}

if (actionFailure != null) {
syscalls.exitSideEffectBlockWithTerminalException(actionFailure, exitCallback)
syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback)
} else {
syscalls.exitSideEffectBlock(
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
}

return serde.deserializeWrappingException(syscalls, exitResult.await())
}

@UsePreviewContext
override suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String,
retryPolicy: RetryPolicy?,
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.complete(t!!)
cont.resume(deferred)
}

override fun onFailure(t: TerminalException) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.completeExceptionally(t)
cont.resume(deferred)
}

override fun onCancel(t: Throwable?) {
cont.cancel(t)
}

override fun onNotExecuted() {
cont.resume(CompletableDeferred())
}
})
}

if (exitResult.isCompleted) {
return serde.deserializeWrappingException(syscalls, exitResult.await())!!
}

var actionReturnValue: T? = null
var actionFailure: Throwable? = null
try {
actionReturnValue = block()
} catch (t: Throwable) {
actionFailure = t
}

val exitCallback =
object : ExitSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
exitResult.complete(t!!)
}

override fun onFailure(t: TerminalException) {
exitResult.completeExceptionally(t)
}

override fun onCancel(t: Throwable?) {
exitResult.cancel(CancellationException(message = null, cause = t))
}
}

if (actionFailure != null) {
val javaRetryPolicy =
retryPolicy?.let {
dev.restate.sdk.common.RetryPolicy.exponential(
it.initialDelay.toJavaDuration(), it.exponentiationFactor)
.setMaxAttempts(it.maxAttempts)
.setMaxDelay(it.maxDelay?.toJavaDuration())
.setMaxDuration(it.maxDuration?.toJavaDuration())
}
syscalls.exitSideEffectBlockWithException(actionFailure, javaRetryPolicy, exitCallback)
} else {
syscalls.exitSideEffectBlock(
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/** Retry policy configuration. */
@UsePreviewContext
data class RetryPolicy(
/** Initial retry delay for the first retry attempt. */
val initialDelay: Duration,
/** Exponentiation factor to use when computing the next retry delay. */
val exponentiationFactor: Float,
/** Maximum delay between retries. */
val maxDelay: Duration? = null,
/**
* Maximum number of attempts before giving up retrying.
*
* The policy gives up retrying when either at least the given number of attempts is reached, or
* the [maxDuration] (if set) is reached first. If both [maxAttempts] and [maxDuration] are
* `null`, the policy will retry indefinitely.
*
* **Note:** The number of actual retries may be higher than the provided value. This is due to
* the nature of the `run` operation, which executes the closure on the service and sends the
* result afterward to Restate.
*/
val maxAttempts: Int? = null,
/**
* Maximum duration of the retry loop.
*
* The policy gives up retrying when either the retry loop lasted at least for this given max
* duration, or the [maxAttempts] (if set) is reached first. If both [maxAttempts] and
* [maxDuration] are `null`, the policy will retry indefinitely.
*
* **Note:** The real retry loop duration may be higher than the given duration. TThis is due to
* the nature of the `run` operation, which executes the closure on the service and sends the
* result afterward to Restate.
*/
val maxDuration: Duration? = null
) {

@UsePreviewContext
data class Builder
internal constructor(
var initialDelay: Duration = 100.milliseconds,
var exponentiationFactor: Float = 2.0f,
var maxDelay: Duration? = null,
var maxAttempts: Int? = null,
var maxDuration: Duration? = null
) {
fun build() =
RetryPolicy(
initialDelay = initialDelay,
exponentiationFactor = exponentiationFactor,
maxDelay = maxDelay,
maxDuration = maxDuration,
maxAttempts = maxAttempts)
}
}

@UsePreviewContext
fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy {
val builder = RetryPolicy.Builder()
builder.init()
return builder.build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

/**
* Opt-in annotation to use the preview of new context features.
*
* In order to use these methods, you **MUST enable the preview context**, through the endpoint
* builders using `enablePreviewContext()`.
*/
@RequiresOptIn
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION)
annotation class UsePreviewContext
30 changes: 29 additions & 1 deletion sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.*
import dev.restate.sdk.common.DurablePromiseKey
import dev.restate.sdk.common.Output
import dev.restate.sdk.common.Request
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.common.Target
import dev.restate.sdk.common.syscalls.Syscalls
import java.util.*
Expand Down Expand Up @@ -145,6 +149,21 @@ sealed interface Context {
*/
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T

/**
* Like [runBlock], but using a custom retry policy.
*
* When a retry policy is not specified, the `runBlock` will be retried using the
* [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by
* default retries indefinitely.
*/
@UsePreviewContext
suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String = "",
retryPolicy: RetryPolicy? = null,
block: suspend () -> T
): T

/**
* Create an [Awakeable], addressable through [Awakeable.id].
*
Expand Down Expand Up @@ -229,6 +248,15 @@ suspend inline fun <reified T : Any> Context.runBlock(
return this.runBlock(KtSerdes.json(), name, block)
}

@UsePreviewContext
suspend inline fun <reified T : Any> Context.runBlock(
name: String = "",
retryPolicy: RetryPolicy? = null,
noinline block: suspend () -> T
): T {
return this.runBlock(KtSerdes.json(), name, retryPolicy, block)
}

/**
* Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder
import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService
import java.util.*
import kotlin.coroutines.coroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers

Expand Down Expand Up @@ -75,4 +76,24 @@ class SideEffectTest : SideEffectTestSuite() {
testDefinitionForService<Unit, String>("FailingSideEffect") { ctx, _: Unit ->
ctx.runBlock(name) { throw IllegalStateException(reason) }
}

@OptIn(UsePreviewContext::class)
override fun failingSideEffectWithRetryPolicy(
reason: String,
retryPolicy: dev.restate.sdk.common.RetryPolicy?
) =
testDefinitionForService<Unit, String>("FailingSideEffectWithRetryPolicy") { ctx, _: Unit ->
ctx.runBlock(
retryPolicy =
retryPolicy?.let {
RetryPolicy(
initialDelay = it.initialDelay.toKotlinDuration(),
exponentiationFactor = it.exponentiationFactor,
maxDelay = it.maxDelay?.toKotlinDuration(),
maxDuration = it.maxDuration?.toKotlinDuration(),
maxAttempts = it.maxAttempts)
}) {
throw IllegalStateException(reason)
}
}
}
11 changes: 4 additions & 7 deletions sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class ContextImpl implements ObjectContext, WorkflowContext {

private final Syscalls syscalls;
final Syscalls syscalls;

ContextImpl(Syscalls syscalls) {
this.syscalls = syscalls;
Expand Down Expand Up @@ -163,18 +163,15 @@ public void onCancel(@Nullable Throwable t) {
};

T res = null;
TerminalException failure = null;
Throwable failure = null;
try {
res = action.get();
} catch (TerminalException e) {
failure = e;
} catch (Throwable e) {
syscalls.fail(e);
AbortedExecutionException.sneakyThrow();
failure = e;
}

if (failure != null) {
syscalls.exitSideEffectBlockWithTerminalException(failure, exitCallback);
syscalls.exitSideEffectBlockWithException(failure, null, exitCallback);
} else {
syscalls.exitSideEffectBlock(
Util.serializeWrappingException(syscalls, serde, res), exitCallback);
Expand Down
Loading
Loading