Skip to content

Commit 66d8364

Browse files
Discovery manifest v3
1 parent 844d864 commit 66d8364

File tree

6 files changed

+552
-48
lines changed

6 files changed

+552
-48
lines changed

python/restate/discovery.py

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,36 @@ def __init__(self, setContentTypeIfEmpty: bool, contentType: Optional[str] = Non
6060
self.jsonSchema = jsonSchema
6161

6262
class Handler:
63-
def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload | Dict[str, str]] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None):
63+
# pylint: disable=R0902
64+
def __init__(self, name: str, ty: Optional[ServiceHandlerType] = None, input: Optional[InputPayload | Dict[str, str]] = None, output: Optional[OutputPayload] = None, description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, workflowCompletionRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
6465
self.name = name
6566
self.ty = ty
6667
self.input = input
6768
self.output = output
6869
self.documentation = description
6970
self.metadata = metadata
71+
self.inactivityTimeout = inactivityTimeout
72+
self.abortTimeout = abortTimeout
73+
self.journalRetention = journalRetention
74+
self.idempotencyRetention = idempotencyRetention
75+
self.workflowCompletionRetention = workflowCompletionRetention
76+
self.enableLazyState = enableLazyState
77+
self.ingressPrivate = ingressPrivate
7078

7179
class Service:
72-
def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None):
80+
# pylint: disable=R0902
81+
def __init__(self, name: str, ty: ServiceType, handlers: List[Handler], description: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, inactivityTimeout: Optional[int] = None, abortTimeout: Optional[int] = None, journalRetention: Optional[int] = None, idempotencyRetention: Optional[int] = None, enableLazyState: Optional[bool] = None, ingressPrivate: Optional[bool] = None):
7382
self.name = name
7483
self.ty = ty
7584
self.handlers = handlers
7685
self.documentation = description
7786
self.metadata = metadata
87+
self.inactivityTimeout = inactivityTimeout
88+
self.abortTimeout = abortTimeout
89+
self.journalRetention = journalRetention
90+
self.idempotencyRetention = idempotencyRetention
91+
self.enableLazyState = enableLazyState
92+
self.ingressPrivate = ingressPrivate
7893

7994
class Endpoint:
8095
def __init__(self, protocolMode: ProtocolMode, minProtocolVersion: int, maxProtocolVersion: int, services: List[Service]):
@@ -148,20 +163,51 @@ def json_schema_from_type_hint(type_hint: Optional[TypeHint[Any]]) -> Any:
148163
return type_hint_to_json_schema(type_hint.annotation)
149164

150165

151-
166+
# pylint: disable=R0912
152167
def compute_discovery_json(endpoint: RestateEndpoint,
153168
version: int,
154-
discovered_as: typing.Literal["bidi", "request_response"]) -> typing.Tuple[typing.Dict[str, str] ,str]:
169+
discovered_as: typing.Literal["bidi", "request_response"]) -> str:
155170
"""
156-
return restate's discovery object as JSON
171+
return restate's discovery object as JSON
157172
"""
158-
if version != 1:
159-
raise ValueError(f"Unsupported protocol version {version}")
160173

161174
ep = compute_discovery(endpoint, discovered_as)
175+
176+
# Validate that new discovery fields aren't used with older protocol versions
177+
if version <= 2:
178+
# Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
179+
for service in ep.services:
180+
if service.inactivityTimeout is not None:
181+
raise ValueError("inactivityTimeout is only supported in discovery protocol version 3")
182+
if service.abortTimeout is not None:
183+
raise ValueError("abortTimeout is only supported in discovery protocol version 3")
184+
if service.idempotencyRetention is not None:
185+
raise ValueError("idempotencyRetention is only supported in discovery protocol version 3")
186+
if service.journalRetention is not None:
187+
raise ValueError("journalRetention is only supported in discovery protocol version 3")
188+
if service.enableLazyState is not None:
189+
raise ValueError("enableLazyState is only supported in discovery protocol version 3")
190+
if service.ingressPrivate is not None:
191+
raise ValueError("ingressPrivate is only supported in discovery protocol version 3")
192+
193+
for handler in service.handlers:
194+
if handler.inactivityTimeout is not None:
195+
raise ValueError("inactivityTimeout is only supported in discovery protocol version 3")
196+
if handler.abortTimeout is not None:
197+
raise ValueError("abortTimeout is only supported in discovery protocol version 3")
198+
if handler.idempotencyRetention is not None:
199+
raise ValueError("idempotencyRetention is only supported in discovery protocol version 3")
200+
if handler.journalRetention is not None:
201+
raise ValueError("journalRetention is only supported in discovery protocol version 3")
202+
if handler.workflowCompletionRetention is not None:
203+
raise ValueError("workflowCompletionRetention is only supported in discovery protocol version 3")
204+
if handler.enableLazyState is not None:
205+
raise ValueError("enableLazyState is only supported in discovery protocol version 3")
206+
if handler.ingressPrivate is not None:
207+
raise ValueError("ingressPrivate is only supported in discovery protocol version 3")
208+
162209
json_str = json.dumps(ep, cls=PythonClassEncoder, allow_nan=False)
163-
headers = {"content-type": "application/vnd.restate.endpointmanifest.v1+json"}
164-
return (headers, json_str)
210+
return json_str
165211

166212

167213
def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal["bidi", "request_response"]) -> Endpoint:
@@ -200,11 +246,28 @@ def compute_discovery(endpoint: RestateEndpoint, discovered_as : typing.Literal[
200246
input=inp,
201247
output=out,
202248
description=handler.description,
203-
metadata=handler.metadata))
249+
metadata=handler.metadata,
250+
inactivityTimeout=int(handler.inactivity_timeout.total_seconds() * 1000) if handler.inactivity_timeout else None,
251+
abortTimeout=int(handler.abort_timeout.total_seconds() * 1000) if handler.abort_timeout else None,
252+
journalRetention=int(handler.journal_retention.total_seconds() * 1000) if handler.journal_retention else None,
253+
idempotencyRetention=int(handler.idempotency_retention.total_seconds() * 1000) if handler.idempotency_retention else None,
254+
workflowCompletionRetention=int(handler.workflow_retention.total_seconds() * 1000) if handler.workflow_retention else None,
255+
enableLazyState=handler.enable_lazy_state,
256+
ingressPrivate=handler.ingress_private))
204257
# add the service
205258
description = service.service_tag.description
206259
metadata = service.service_tag.metadata
207-
services.append(Service(name=service.name, ty=service_type, handlers=service_handlers, description=description, metadata=metadata))
260+
services.append(Service(name=service.name,
261+
ty=service_type,
262+
handlers=service_handlers,
263+
description=description,
264+
metadata=metadata,
265+
inactivityTimeout=int(service.inactivity_timeout.total_seconds() * 1000) if service.inactivity_timeout else None,
266+
abortTimeout=int(service.abort_timeout.total_seconds() * 1000) if service.abort_timeout else None,
267+
journalRetention=int(service.journal_retention.total_seconds() * 1000) if service.journal_retention else None,
268+
idempotencyRetention=int(service.idempotency_retention.total_seconds() * 1000) if service.idempotency_retention else None,
269+
enableLazyState=service.enable_lazy_state if hasattr(service, 'enable_lazy_state') else None,
270+
ingressPrivate=service.ingress_private))
208271

209272
if endpoint.protocol:
210273
protocol_mode = PROTOCOL_MODES[endpoint.protocol]

python/restate/handler.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"""
1717

1818
from dataclasses import dataclass
19+
from datetime import timedelta
1920
from inspect import Signature
2021
from typing import Any, Callable, Awaitable, Dict, Generic, Literal, Optional, TypeVar
2122

@@ -104,6 +105,23 @@ def update_handler_io_with_type_hints(handler_io: HandlerIO[I, O], signature: Si
104105
class Handler(Generic[I, O]):
105106
"""
106107
Represents a handler for a service.
108+
109+
Attributes:
110+
service_tag: The service tag for the handler.
111+
handler_io: The input/output configuration for the handler.
112+
kind: The kind of handler (exclusive, shared, workflow).
113+
name: The name of the handler.
114+
fn: The handler function.
115+
arity: The number of parameters in the handler function.
116+
description: Documentation for this handler definition.
117+
metadata: Custom metadata for this handler definition.
118+
inactivity_timeout: Inactivity timeout duration.
119+
abort_timeout: Abort timeout duration.
120+
journal_retention: Journal retention duration.
121+
idempotency_retention: Idempotency retention duration.
122+
workflow_retention: Workflow completion retention duration.
123+
enable_lazy_state: If true, lazy state is enabled.
124+
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
107125
"""
108126
service_tag: ServiceTag
109127
handler_io: HandlerIO[I, O]
@@ -113,21 +131,52 @@ class Handler(Generic[I, O]):
113131
arity: int
114132
description: Optional[str] = None
115133
metadata: Optional[Dict[str, str]] = None
134+
inactivity_timeout: Optional[timedelta] = None
135+
abort_timeout: Optional[timedelta] = None
136+
journal_retention: Optional[timedelta] = None
137+
idempotency_retention: Optional[timedelta] = None
138+
workflow_retention: Optional[timedelta] = None
139+
enable_lazy_state: Optional[bool] = None
140+
ingress_private: Optional[bool] = None
116141

117142

118143
# disable too many arguments warning
119144
# pylint: disable=R0913
120-
145+
# pylint: disable=R0914
121146
def make_handler(service_tag: ServiceTag,
122147
handler_io: HandlerIO[I, O],
123148
name: str | None,
124149
kind: Optional[Literal["exclusive", "shared", "workflow"]],
125150
wrapped: Any,
126151
signature: Signature,
127152
description: Optional[str] = None,
128-
metadata: Optional[Dict[str, str]] = None) -> Handler[I, O]:
153+
metadata: Optional[Dict[str, str]] = None,
154+
inactivity_timeout: Optional[timedelta] = None,
155+
abort_timeout: Optional[timedelta] = None,
156+
journal_retention: Optional[timedelta] = None,
157+
idempotency_retention: Optional[timedelta] = None,
158+
workflow_retention: Optional[timedelta] = None,
159+
enable_lazy_state: Optional[bool] = None,
160+
ingress_private: Optional[bool] = None) -> Handler[I, O]:
129161
"""
130162
Factory function to create a handler.
163+
164+
Args:
165+
service_tag: The service tag for the handler.
166+
handler_io: The input/output configuration for the handler.
167+
name: The name of the handler.
168+
kind: The kind of handler (exclusive, shared, workflow).
169+
wrapped: The wrapped function.
170+
signature: The signature of the function.
171+
description: Documentation for this handler definition.
172+
metadata: Custom metadata for this handler definition.
173+
inactivity_timeout: Inactivity timeout duration.
174+
abort_timeout: Abort timeout duration.
175+
journal_retention: Journal retention duration.
176+
idempotency_retention: Idempotency retention duration.
177+
workflow_retention: Workflow completion retention duration.
178+
enable_lazy_state: If true, lazy state is enabled.
179+
ingress_private: If true, the handler cannot be invoked from the HTTP nor Kafka ingress.
131180
"""
132181
# try to deduce the handler name
133182
handler_name = name
@@ -149,7 +198,14 @@ def make_handler(service_tag: ServiceTag,
149198
fn=wrapped,
150199
arity=arity,
151200
description=description,
152-
metadata=metadata)
201+
metadata=metadata,
202+
inactivity_timeout=inactivity_timeout,
203+
abort_timeout=abort_timeout,
204+
journal_retention=journal_retention,
205+
idempotency_retention=idempotency_retention,
206+
workflow_retention=workflow_retention,
207+
enable_lazy_state=enable_lazy_state,
208+
ingress_private=ingress_private)
153209

154210
vars(wrapped)[RESTATE_UNIQUE_HANDLER_SYMBOL] = handler
155211
return handler

0 commit comments

Comments
 (0)