Skip to content

Commit 3eaff73

Browse files
EleanorWhoclaudecdoern
authored
feat(messages): emit ping and error stream events for Anthropic SDK compatibility (#6007)
## Summary - Add `PingEvent` and `ErrorStreamEvent` models to `AnthropicStreamEvent` union - Emit ping heartbeats after `message_start` and each `content_block_stop` in translation-mode streams to prevent load-balancer idle-timeout disconnects - Wrap `_stream_openai_to_anthropic` and `_passthrough_stream` in error handling that yields `ErrorStreamEvent` before closing, so clients receive typed errors instead of truncated streams - Handle `ping` and `error` event types in `_parse_sse_event` for passthrough mode ## Test plan - Unit tests for ping event placement in text and tool streams - Unit tests for error event emission on mid-stream exceptions - Unit tests for ping/error parsing in `_parse_sse_event` - Full unit test suite passes with no regressions 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/ogx-ai/ogx/pull/6007" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review"> </picture> </a> <!-- devin-review-badge-end --> --------- Signed-off-by: Eleanor Hu <ehu@redhat.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Charlie Doern <cdoern@redhat.com>
1 parent 8b76f28 commit 3eaff73

3 files changed

Lines changed: 257 additions & 93 deletions

File tree

src/ogx/providers/inline/messages/impl.py

Lines changed: 99 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
ContentBlockStartEvent,
6161
ContentBlockStopEvent,
6262
CreateMessageBatchRequest,
63+
ErrorStreamEvent,
6364
ListMessageBatchesRequest,
6465
ListMessageBatchesResponse,
6566
MessageBatch,
@@ -71,6 +72,7 @@
7172
MessageDeltaEvent,
7273
MessageStartEvent,
7374
MessageStopEvent,
75+
PingEvent,
7476
RetrieveMessageBatchRequest,
7577
RetrieveMessageBatchResultsRequest,
7678
_AnthropicErrorDetail,
@@ -515,19 +517,26 @@ async def _passthrough_stream(
515517
body: dict[str, Any],
516518
) -> AsyncIterator[AnthropicStreamEvent]:
517519
"""Stream SSE events directly from the provider."""
518-
async with self._client.stream("POST", url, json=body, headers=headers, timeout=300) as resp:
519-
resp.raise_for_status()
520-
event_type = None
521-
async for line in resp.aiter_lines():
522-
line = line.strip()
523-
if line.startswith("event: "):
524-
event_type = line[7:]
525-
elif line.startswith("data: ") and event_type:
526-
data = json.loads(line[6:])
527-
event = self._parse_sse_event(event_type, data)
528-
if event:
529-
yield event
530-
event_type = None
520+
try:
521+
async with self._client.stream("POST", url, json=body, headers=headers, timeout=300) as resp:
522+
resp.raise_for_status()
523+
event_type = None
524+
async for line in resp.aiter_lines():
525+
line = line.strip()
526+
if line.startswith("event: "):
527+
event_type = line[7:]
528+
elif line.startswith("data: ") and event_type:
529+
data = json.loads(line[6:])
530+
event = self._parse_sse_event(event_type, data)
531+
if event:
532+
yield event
533+
event_type = None
534+
except Exception:
535+
logger.exception("Failed to stream passthrough response")
536+
yield ErrorStreamEvent(
537+
error=_AnthropicErrorDetail(type="api_error", message="Internal server error"),
538+
)
539+
return
531540

532541
def _parse_sse_event(self, event_type: str, data: dict[str, Any]) -> AnthropicStreamEvent | None:
533542
"""Parse an Anthropic SSE event from its type and data."""
@@ -572,6 +581,10 @@ def _parse_sse_event(self, event_type: str, data: dict[str, Any]) -> AnthropicSt
572581
)
573582
if event_type == "message_stop":
574583
return MessageStopEvent()
584+
if event_type == "ping":
585+
return PingEvent()
586+
if event_type == "error":
587+
return ErrorStreamEvent(error=_AnthropicErrorDetail(**data["error"]))
575588
return None
576589

577590
async def _passthrough_count_tokens(
@@ -874,6 +887,7 @@ async def _stream_openai_to_anthropic(
874887
usage=AnthropicUsage(input_tokens=0, output_tokens=0),
875888
),
876889
)
890+
yield PingEvent()
877891

878892
content_block_index = 0
879893
in_text_block = False
@@ -884,81 +898,93 @@ async def _stream_openai_to_anthropic(
884898
cache_read_tokens: int | None = None
885899
stop_reason = "end_turn"
886900

887-
async for chunk in openai_stream:
888-
if not chunk.choices:
889-
# Usage-only chunk
890-
if chunk.usage:
891-
input_tokens = chunk.usage.prompt_tokens or 0
892-
output_tokens = chunk.usage.completion_tokens or 0
893-
if chunk.usage.prompt_tokens_details and hasattr(
894-
chunk.usage.prompt_tokens_details, "cached_tokens"
895-
):
896-
cache_read_tokens = chunk.usage.prompt_tokens_details.cached_tokens
897-
continue
898-
899-
choice = chunk.choices[0]
900-
delta = choice.delta
901+
try:
902+
async for chunk in openai_stream:
903+
if not chunk.choices:
904+
# Usage-only chunk
905+
if chunk.usage:
906+
input_tokens = chunk.usage.prompt_tokens or 0
907+
output_tokens = chunk.usage.completion_tokens or 0
908+
if chunk.usage.prompt_tokens_details and hasattr(
909+
chunk.usage.prompt_tokens_details, "cached_tokens"
910+
):
911+
cache_read_tokens = chunk.usage.prompt_tokens_details.cached_tokens
912+
continue
913+
914+
choice = chunk.choices[0]
915+
delta = choice.delta
916+
917+
if delta and delta.content:
918+
if not in_text_block:
919+
yield ContentBlockStartEvent(
920+
index=content_block_index,
921+
content_block=AnthropicTextBlock(text=""),
922+
)
923+
in_text_block = True
901924

902-
if delta and delta.content:
903-
if not in_text_block:
904-
yield ContentBlockStartEvent(
925+
yield ContentBlockDeltaEvent(
905926
index=content_block_index,
906-
content_block=AnthropicTextBlock(text=""),
927+
delta=_TextDelta(text=delta.content),
907928
)
908-
in_text_block = True
909-
910-
yield ContentBlockDeltaEvent(
911-
index=content_block_index,
912-
delta=_TextDelta(text=delta.content),
913-
)
914-
915-
if delta and delta.tool_calls:
916-
for tc_delta in delta.tool_calls:
917-
tc_idx = tc_delta.index if tc_delta.index is not None else 0
918929

919-
if tc_idx not in in_tool_blocks:
920-
# Close text block if open
921-
if in_text_block:
922-
yield ContentBlockStopEvent(index=content_block_index)
930+
if delta and delta.tool_calls:
931+
for tc_delta in delta.tool_calls:
932+
tc_idx = tc_delta.index if tc_delta.index is not None else 0
933+
934+
if tc_idx not in in_tool_blocks:
935+
# Close text block if open
936+
if in_text_block:
937+
yield ContentBlockStopEvent(index=content_block_index)
938+
yield PingEvent()
939+
content_block_index += 1
940+
in_text_block = False
941+
942+
# Start new tool_use block
943+
in_tool_blocks[tc_idx] = True
944+
tool_call_index_to_block_index[tc_idx] = content_block_index
945+
946+
yield ContentBlockStartEvent(
947+
index=content_block_index,
948+
content_block=AnthropicToolUseBlock(
949+
id=tc_delta.id or f"toolu_{uuid.uuid4().hex[:24]}",
950+
name=tc_delta.function.name if tc_delta.function and tc_delta.function.name else "",
951+
input={},
952+
),
953+
)
923954
content_block_index += 1
924-
in_text_block = False
925-
926-
# Start new tool_use block
927-
in_tool_blocks[tc_idx] = True
928-
tool_call_index_to_block_index[tc_idx] = content_block_index
929955

930-
yield ContentBlockStartEvent(
931-
index=content_block_index,
932-
content_block=AnthropicToolUseBlock(
933-
id=tc_delta.id or f"toolu_{uuid.uuid4().hex[:24]}",
934-
name=tc_delta.function.name if tc_delta.function and tc_delta.function.name else "",
935-
input={},
936-
),
937-
)
938-
content_block_index += 1
939-
940-
if tc_delta.function and tc_delta.function.arguments:
941-
block_idx = tool_call_index_to_block_index[tc_idx]
942-
yield ContentBlockDeltaEvent(
943-
index=block_idx,
944-
delta=_InputJsonDelta(partial_json=tc_delta.function.arguments),
945-
)
956+
if tc_delta.function and tc_delta.function.arguments:
957+
block_idx = tool_call_index_to_block_index[tc_idx]
958+
yield ContentBlockDeltaEvent(
959+
index=block_idx,
960+
delta=_InputJsonDelta(partial_json=tc_delta.function.arguments),
961+
)
946962

947-
if choice.finish_reason:
948-
stop_reason = _FINISH_TO_STOP_REASON.get(choice.finish_reason, "end_turn")
963+
if choice.finish_reason:
964+
stop_reason = _FINISH_TO_STOP_REASON.get(choice.finish_reason, "end_turn")
949965

950-
if chunk.usage:
951-
input_tokens = chunk.usage.prompt_tokens or 0
952-
output_tokens = chunk.usage.completion_tokens or 0
953-
if chunk.usage.prompt_tokens_details and hasattr(chunk.usage.prompt_tokens_details, "cached_tokens"):
954-
cache_read_tokens = chunk.usage.prompt_tokens_details.cached_tokens
966+
if chunk.usage:
967+
input_tokens = chunk.usage.prompt_tokens or 0
968+
output_tokens = chunk.usage.completion_tokens or 0
969+
if chunk.usage.prompt_tokens_details and hasattr(
970+
chunk.usage.prompt_tokens_details, "cached_tokens"
971+
):
972+
cache_read_tokens = chunk.usage.prompt_tokens_details.cached_tokens
973+
except Exception:
974+
logger.exception("Failed to stream translation response")
975+
yield ErrorStreamEvent(
976+
error=_AnthropicErrorDetail(type="api_error", message="Internal server error"),
977+
)
978+
return
955979

956980
# Close any open blocks
957981
if in_text_block:
958982
yield ContentBlockStopEvent(index=content_block_index)
983+
yield PingEvent()
959984

960985
for _tc_idx, block_idx in tool_call_index_to_block_index.items():
961986
yield ContentBlockStopEvent(index=block_idx)
987+
yield PingEvent()
962988

963989
# Final events
964990
yield MessageDeltaEvent(

src/ogx_api/messages/models.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -434,16 +434,6 @@ class MessageStopEvent(BaseModel):
434434
type: Literal["message_stop"] = "message_stop"
435435

436436

437-
AnthropicStreamEvent = (
438-
MessageStartEvent
439-
| ContentBlockStartEvent
440-
| ContentBlockDeltaEvent
441-
| ContentBlockStopEvent
442-
| MessageDeltaEvent
443-
| MessageStopEvent
444-
)
445-
446-
447437
# -- Error response --
448438

449439

@@ -459,6 +449,31 @@ class AnthropicErrorResponse(BaseModel):
459449
error: _AnthropicErrorDetail
460450

461451

452+
class PingEvent(BaseModel):
453+
"""Keep-alive heartbeat sent between content blocks."""
454+
455+
type: Literal["ping"] = "ping"
456+
457+
458+
class ErrorStreamEvent(BaseModel):
459+
"""Mid-stream error event sent before the stream closes."""
460+
461+
type: Literal["error"] = "error"
462+
error: _AnthropicErrorDetail
463+
464+
465+
AnthropicStreamEvent = (
466+
MessageStartEvent
467+
| ContentBlockStartEvent
468+
| ContentBlockDeltaEvent
469+
| ContentBlockStopEvent
470+
| MessageDeltaEvent
471+
| MessageStopEvent
472+
| PingEvent
473+
| ErrorStreamEvent
474+
)
475+
476+
462477
# -- Message Batches --
463478

464479

0 commit comments

Comments
 (0)