Skip to content

fix(anthropic): handle malformed streamed responses gracefully #13629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions ddtrace/contrib/internal/anthropic/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ def _extract_from_chunk(chunk, message) -> Tuple[Dict[str, str], bool]:
}
chunk_type = _get_attr(chunk, "type", "")
transformation = TRANSFORMATIONS_BY_BLOCK_TYPE.get(chunk_type)
if transformation is not None:
message = transformation(chunk, message)

return message
return transformation(chunk, message) if transformation is not None else message


def _on_message_start_chunk(chunk, message):
Expand Down Expand Up @@ -221,20 +218,28 @@ def _on_content_block_start_chunk(chunk, message):
def _on_content_block_delta_chunk(chunk, message):
# delta events contain new content for the current message.content block
delta_block = _get_attr(chunk, "delta", "")
if delta_block:
chunk_content_text = _get_attr(delta_block, "text", "")
if chunk_content_text:
message["content"][-1]["text"] += chunk_content_text

chunk_content_json = _get_attr(delta_block, "partial_json", "")
if chunk_content_json and _get_attr(delta_block, "type", "") == "input_json_delta":
# we have a json content block, most likely a tool input dict
message["content"][-1]["input"] += chunk_content_json
if delta_block is None:
return message

if len(message["content"]) == 0:
return message

chunk_content_text = _get_attr(delta_block, "text", "")
if chunk_content_text:
message["content"][-1]["text"] += chunk_content_text

chunk_content_json = _get_attr(delta_block, "partial_json", "")
if chunk_content_json and _get_attr(delta_block, "type", "") == "input_json_delta":
# we have a json content block, most likely a tool input dict
message["content"][-1]["input"] += chunk_content_json
return message


def _on_content_block_stop_chunk(chunk, message):
# this is the start to a message.content block (possibly 1 of several content blocks)
if len(message["content"]) == 0:
return message

content_type = _get_attr(message["content"][-1], "type", "")
if content_type == "tool_use":
input_json = _get_attr(message["content"][-1], "input", "{}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
interactions:
- request:
body: '{"max_tokens": 200, "messages": [{"role": "user", "content": "What is the
weather in San Francisco, CA?"}], "model": "claude-3-opus-20240229", "tools":
[{"name": "get_weather", "description": "Get the weather for a specific location",
"input_schema": {"type": "object", "properties": {"location": {"type": "string"}}}}],
"stream": true}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-length:
- '336'
content-type:
- application/json
host:
- api.anthropic.com
user-agent:
- AsyncAnthropic/Python 0.28.0
x-stainless-arch:
- arm64
x-stainless-async:
- async:asyncio
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 0.28.0
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.10.13
x-stainless-stream-helper:
- messages
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: 'event: message_start

data: {"type":"message_start","message":{"id":"msg_01Tx24z76YJbcUHzKJbPeTpu","type":"message","role":"assistant","model":"claude-3-opus-20240229","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":599,"output_tokens":2}} }


event: content_block_start

data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }


event: ping

data: {"type": "ping"}


event: content_block_stop

data: {"type":"content_block_stop","index":0 }


'
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 892ac094ac670f98-EWR
Cache-Control:
- no-cache
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Wed, 12 Jun 2024 15:07:46 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
anthropic-ratelimit-requests-limit:
- '4000'
anthropic-ratelimit-requests-remaining:
- '3999'
anthropic-ratelimit-requests-reset:
- '2024-06-12T15:08:35Z'
anthropic-ratelimit-tokens-limit:
- '400000'
anthropic-ratelimit-tokens-remaining:
- '399000'
anthropic-ratelimit-tokens-reset:
- '2024-06-12T15:08:35Z'
request-id:
- req_01UCCQXyHwxBKbXcKzYabxAx
via:
- 1.1 google
status:
code: 200
message: OK
version: 1
13 changes: 13 additions & 0 deletions tests/contrib/anthropic/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,16 @@ async def test_anthropic_llm_async_tools_stream_full_use(anthropic, request_vcr,
)
async for _ in stream:
pass


def test_anthropic_streaming_with_malformed_streamed_events(anthropic, request_vcr):
"""Assert that a malformed streamed event does not throw an error"""
with request_vcr.use_cassette("anthropic_completion_stream_malformed.yaml"):
llm = anthropic.Anthropic()
with llm.messages.stream(
model="claude-3-opus-20240229",
max_tokens=200,
messages=[{"role": "user", "content": "What is the weather in San Francisco, CA?"}],
) as stream:
for _ in stream.text_stream:
pass
12 changes: 12 additions & 0 deletions tests/contrib/anthropic/test_anthropic_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,3 +775,15 @@ async def test_tools_async_stream_helper(
tags={"ml_app": "<ml-app-name>", "service": "tests.contrib.anthropic"},
)
)

def test_anthropic_streaming_with_malformed_streamed_events(self, anthropic, request_vcr):
"""Assert that a malformed streamed event does not throw an error"""
with request_vcr.use_cassette("anthropic_completion_stream_malformed.yaml"):
llm = anthropic.Anthropic()
with llm.messages.stream(
model="claude-3-opus-20240229",
max_tokens=200,
messages=[{"role": "user", "content": WEATHER_PROMPT}],
) as stream:
for _ in stream.text_stream:
pass
Loading