Skip to content

Commit b0f5322

Browse files
committed
feat: workflow rollback types + Python SDK (withRollback, saga pattern)
1 parent 612919d commit b0f5322

File tree

8 files changed

+681
-38
lines changed

8 files changed

+681
-38
lines changed

src/pyodide/internal/workers-api/src/workers/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
Request,
1616
RequestInitCfProperties,
1717
Response,
18+
RetryConfig,
19+
RollbackConfig,
20+
RollbackStep,
21+
StepConfig,
22+
UndoHandler,
1823
WorkerEntrypoint,
1924
WorkflowEntrypoint,
2025
fetch,
@@ -42,6 +47,11 @@
4247
"Request",
4348
"RequestInitCfProperties",
4449
"Response",
50+
"RetryConfig",
51+
"RollbackConfig",
52+
"RollbackStep",
53+
"StepConfig",
54+
"UndoHandler",
4555
"WorkerEntrypoint",
4656
"WorkflowEntrypoint",
4757
"env",

src/pyodide/internal/workers-api/src/workers/_workers.py

Lines changed: 272 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@
1818
from enum import StrEnum
1919
from http import HTTPMethod, HTTPStatus
2020
from types import LambdaType
21-
from typing import Any, Never, Protocol, TypedDict, Unpack
21+
from typing import (
22+
Any,
23+
Awaitable,
24+
Callable,
25+
Generic,
26+
Literal,
27+
Never,
28+
Protocol,
29+
TypedDict,
30+
TypeVar,
31+
Unpack,
32+
)
2233

2334
# Get globals modules and import function from the entrypoint-helper
2435
import _pyodide_entrypoint_helper
@@ -38,6 +49,34 @@
3849
from pyodide.http import pyfetch
3950
from workers.workflows import NonRetryableError
4051

52+
# Type definitions for workflow steps
53+
T = TypeVar("T")
54+
55+
56+
class RetryConfig(TypedDict, total=False):
57+
"""Configuration for step retry behavior."""
58+
59+
limit: int
60+
delay: str | int
61+
backoff: Literal["constant", "linear", "exponential"]
62+
63+
64+
class StepConfig(TypedDict, total=False):
65+
"""Configuration for workflow step execution."""
66+
67+
retries: RetryConfig
68+
timeout: str | int
69+
70+
71+
class RollbackConfig(TypedDict, total=False):
72+
"""Configuration for workflow rollback behavior at instance creation."""
73+
74+
continue_on_error: bool
75+
76+
77+
# Undo function signature: (err, value) -> Awaitable[None]
78+
UndoHandler = Callable[[Exception | None, T], Awaitable[None]]
79+
4180

4281
class Context(Protocol):
4382
def waitUntil(self, other: Awaitable[Any]) -> None: ...
@@ -1125,6 +1164,115 @@ def wrapper(*args, **kwargs):
11251164
return wrapper
11261165

11271166

1167+
class RollbackStep(Generic[T]):
1168+
"""
1169+
Wrapper returned by @step.with_rollback decorator.
1170+
1171+
Delegates to the engine's withRollback for durable undo stack management.
1172+
1173+
Usage:
1174+
# Pattern A: Chained decorator (preferred - keeps do/undo together)
1175+
@step.with_rollback("save to db")
1176+
async def save():
1177+
return await db.insert(data)
1178+
1179+
@save.undo
1180+
async def _(error, record_id):
1181+
await db.delete(record_id)
1182+
1183+
record_id = await save()
1184+
1185+
# Pattern B: Parameter (for reusable undo handlers)
1186+
@step.with_rollback("save to db", undo=generic_delete)
1187+
async def save():
1188+
return await db.insert(data)
1189+
"""
1190+
1191+
def __init__(
1192+
self,
1193+
step_wrapper: "_WorkflowStepWrapper",
1194+
name: str,
1195+
do_fn: Callable[..., Awaitable[T]],
1196+
*,
1197+
undo: UndoHandler[T] | None = None,
1198+
depends: list[Callable[..., Awaitable[Any]]] | None = None,
1199+
concurrent: bool = False,
1200+
config: StepConfig | None = None,
1201+
undo_config: StepConfig | None = None,
1202+
):
1203+
self._step_wrapper = step_wrapper
1204+
self._name = name
1205+
self._do_fn = do_fn
1206+
self._undo_handler = undo
1207+
self._depends = depends
1208+
self._concurrent = concurrent
1209+
self._config = config
1210+
self._undo_config = undo_config
1211+
self._step_name = name # For dependency resolution
1212+
1213+
def undo(
1214+
self, fn_or_config: UndoHandler[T] | StepConfig | None = None
1215+
) -> UndoHandler[T] | Callable[[UndoHandler[T]], UndoHandler[T]]:
1216+
"""
1217+
Decorator to register an undo/compensation function for this step.
1218+
1219+
The undo function receives (error, value) where value is the result
1220+
of the do function.
1221+
1222+
Args:
1223+
fn_or_config: Either the undo function directly (@fn.undo) or
1224+
a StepConfig dict (@fn.undo(config={...}))
1225+
"""
1226+
# Support @fn.undo (no parens)
1227+
if callable(fn_or_config):
1228+
self._undo_handler = fn_or_config
1229+
return fn_or_config
1230+
1231+
# Support @fn.undo() or @fn.undo(config={...})
1232+
config = fn_or_config
1233+
1234+
def decorator(fn: UndoHandler[T]) -> UndoHandler[T]:
1235+
self._undo_handler = fn
1236+
if config is not None:
1237+
self._undo_config = config
1238+
return fn
1239+
1240+
return decorator
1241+
1242+
async def __call__(self) -> T:
1243+
"""Execute the step via engine's withRollback for durable undo stack."""
1244+
if self._undo_handler is None:
1245+
raise ValueError(
1246+
f"Step '{self._name}' requires an undo handler. "
1247+
f"Add @{self._do_fn.__name__}.undo or pass undo= parameter."
1248+
)
1249+
1250+
# Resolve dependencies (same pattern as step.do)
1251+
if self._concurrent:
1252+
results = await gather(
1253+
*[
1254+
self._step_wrapper._resolve_dependency(dep)
1255+
for dep in self._depends or []
1256+
]
1257+
)
1258+
else:
1259+
results = [
1260+
await self._step_wrapper._resolve_dependency(dep)
1261+
for dep in self._depends or []
1262+
]
1263+
python_results = [python_from_rpc(r) for r in results]
1264+
1265+
return await _withRollback_call(
1266+
self._step_wrapper,
1267+
self._name,
1268+
self._config,
1269+
self._undo_config,
1270+
self._do_fn,
1271+
self._undo_handler,
1272+
*python_results,
1273+
)
1274+
1275+
11281276
class _WorkflowStepWrapper:
11291277
def __init__(self, js_step):
11301278
self._js_step = js_step
@@ -1169,6 +1317,67 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"):
11691317
),
11701318
)
11711319

1320+
def with_rollback(
1321+
self,
1322+
name: str,
1323+
*,
1324+
undo: UndoHandler[T] | None = None,
1325+
depends: list[Callable[..., Awaitable[Any]]] | None = None,
1326+
concurrent: bool = False,
1327+
config: StepConfig | None = None,
1328+
undo_config: StepConfig | None = None,
1329+
) -> Callable[[Callable[..., Awaitable[T]]], RollbackStep[T]]:
1330+
"""
1331+
Decorator for step with rollback/compensation support (saga pattern).
1332+
1333+
Returns a callable wrapper that allows attaching an .undo decorator for
1334+
compensation logic. Undo functions execute automatically in LIFO order
1335+
when the workflow throws an uncaught error (if rollback config is enabled
1336+
at instance creation).
1337+
1338+
Args:
1339+
name: Step name (up to 256 chars)
1340+
undo: Undo handler, or use @decorated_fn.undo
1341+
depends: Steps this depends on (DAG pattern)
1342+
concurrent: Run dependencies in parallel
1343+
config: Retry/timeout config for do()
1344+
undo_config: Retry/timeout config for undo()
1345+
1346+
Raises:
1347+
ValueError: If no undo handler provided via parameter or decorator
1348+
1349+
Usage:
1350+
# Pattern A: Chained decorator (preferred)
1351+
@step.with_rollback("save to db")
1352+
async def save():
1353+
return await db.insert(data)
1354+
1355+
@save.undo
1356+
async def _(error, record_id):
1357+
await db.delete(record_id)
1358+
1359+
record_id = await save()
1360+
1361+
# Pattern B: Parameter (for reusable undo handlers)
1362+
@step.with_rollback("save to db", undo=generic_delete)
1363+
async def save():
1364+
return await db.insert(data)
1365+
"""
1366+
1367+
def decorator(func: Callable[..., Awaitable[T]]) -> RollbackStep[T]:
1368+
return RollbackStep(
1369+
self,
1370+
name,
1371+
func,
1372+
undo=undo,
1373+
depends=depends,
1374+
concurrent=concurrent,
1375+
config=config,
1376+
undo_config=undo_config,
1377+
)
1378+
1379+
return decorator
1380+
11721381
async def _resolve_dependency(self, dep):
11731382
if dep._step_name in self._memoized_dependencies:
11741383
return self._memoized_dependencies[dep._step_name]
@@ -1211,6 +1420,68 @@ async def _closure():
12111420
return result
12121421

12131422

1423+
async def _withRollback_call(
1424+
entrypoint, name, config, undo_config, do_fn, undo_fn, *dep_results
1425+
):
1426+
"""Call the engine's withRollback with Python callbacks wrapped for JS."""
1427+
1428+
async def _closure():
1429+
async def _do_callback():
1430+
result = do_fn(*dep_results)
1431+
if inspect.iscoroutine(result):
1432+
result = await result
1433+
return to_js(result, dict_converter=Object.fromEntries)
1434+
1435+
async def _undo_callback(js_err, js_value):
1436+
py_err = None
1437+
if js_err is not None:
1438+
py_err = (
1439+
_from_js_error(js_err) if hasattr(js_err, "message") else js_err
1440+
)
1441+
1442+
py_value = python_from_rpc(js_value)
1443+
1444+
result = undo_fn(py_err, py_value)
1445+
if inspect.iscoroutine(result):
1446+
await result
1447+
1448+
handler = {"do": _do_callback}
1449+
if undo_fn is not None:
1450+
handler["undo"] = _undo_callback
1451+
1452+
js_handler = to_js(handler, dict_converter=Object.fromEntries)
1453+
1454+
js_config = None
1455+
if config is not None or undo_config is not None:
1456+
config_dict = dict(config) if config else {}
1457+
if undo_config is not None:
1458+
config_dict["undoConfig"] = undo_config
1459+
js_config = to_js(config_dict, dict_converter=Object.fromEntries)
1460+
1461+
try:
1462+
if js_config is None:
1463+
result = await entrypoint._js_step.withRollback(name, js_handler)
1464+
else:
1465+
result = await entrypoint._js_step.withRollback(
1466+
name, js_handler, js_config
1467+
)
1468+
1469+
return python_from_rpc(result)
1470+
except Exception as exc:
1471+
raise _from_js_error(exc) from exc
1472+
1473+
task = create_task(_closure())
1474+
entrypoint._in_flight[name] = task
1475+
1476+
try:
1477+
result = await task
1478+
entrypoint._memoized_dependencies[name] = result
1479+
finally:
1480+
del entrypoint._in_flight[name]
1481+
1482+
return result
1483+
1484+
12141485
def _wrap_subclass(cls):
12151486
# Override the class __init__ so that we can wrap the `env` in the constructor.
12161487
original_init = cls.__init__

src/workerd/server/tests/python/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ py_wd_test("python-rpc")
5151

5252
py_wd_test("workflow-entrypoint")
5353

54+
py_wd_test("workflow-rollback")
55+
5456
py_wd_test("vendor_dir_compat_flag")
5557

5658
py_wd_test("default-class-with-legacy-global-handlers")

0 commit comments

Comments
 (0)