|
| 1 | +use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamIter}; |
| 2 | +use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; |
| 3 | + |
| 4 | +/// Stateful processor for handling SSE chunks that may contain incomplete events. |
| 5 | +/// |
| 6 | +/// This processor buffers incomplete SSE event bytes when transformation fails |
| 7 | +/// (e.g., due to incomplete JSON) and prepends them to the next chunk for retry. |
| 8 | +pub struct SseChunkProcessor { |
| 9 | + /// Buffered bytes from incomplete SSE events across chunks |
| 10 | + incomplete_event_buffer: Vec<u8>, |
| 11 | +} |
| 12 | + |
| 13 | +impl SseChunkProcessor { |
| 14 | + pub fn new() -> Self { |
| 15 | + Self { |
| 16 | + incomplete_event_buffer: Vec::new(), |
| 17 | + } |
| 18 | + } |
| 19 | + |
| 20 | + /// Process a chunk of SSE data, handling incomplete events across chunk boundaries. |
| 21 | + /// |
| 22 | + /// Returns successfully transformed events. Incomplete events are buffered internally |
| 23 | + /// and will be retried when more data arrives in the next chunk. |
| 24 | + /// |
| 25 | + /// # Arguments |
| 26 | + /// * `chunk` - Raw bytes from upstream SSE stream |
| 27 | + /// * `client_api` - The API format the client expects |
| 28 | + /// * `upstream_api` - The API format from the upstream provider |
| 29 | + /// |
| 30 | + /// # Returns |
| 31 | + /// * `Ok(Vec<SseEvent>)` - Successfully transformed events ready for client |
| 32 | + /// * `Err(String)` - Fatal error that cannot be recovered by buffering |
| 33 | + pub fn process_chunk( |
| 34 | + &mut self, |
| 35 | + chunk: &[u8], |
| 36 | + client_api: &SupportedAPIsFromClient, |
| 37 | + upstream_api: &SupportedUpstreamAPIs, |
| 38 | + ) -> Result<Vec<SseEvent>, String> { |
| 39 | + // Combine buffered incomplete event with new chunk |
| 40 | + let mut combined_data = std::mem::take(&mut self.incomplete_event_buffer); |
| 41 | + combined_data.extend_from_slice(chunk); |
| 42 | + |
| 43 | + // Parse using SseStreamIter |
| 44 | + let sse_iter = match SseStreamIter::try_from(combined_data.as_slice()) { |
| 45 | + Ok(iter) => iter, |
| 46 | + Err(e) => return Err(format!("Failed to create SSE iterator: {}", e)), |
| 47 | + }; |
| 48 | + |
| 49 | + let mut transformed_events = Vec::new(); |
| 50 | + |
| 51 | + // Process each parsed SSE event |
| 52 | + for sse_event in sse_iter { |
| 53 | + // Try to transform the event (this is where incomplete JSON fails) |
| 54 | + match SseEvent::try_from((sse_event.clone(), client_api, upstream_api)) { |
| 55 | + Ok(transformed) => { |
| 56 | + // Successfully transformed - add to results |
| 57 | + transformed_events.push(transformed); |
| 58 | + } |
| 59 | + Err(e) => { |
| 60 | + // Check if this is incomplete JSON (EOF while parsing) vs other errors |
| 61 | + let error_str = e.to_string().to_lowercase(); |
| 62 | + let is_incomplete_json = error_str.contains("eof while parsing") |
| 63 | + || error_str.contains("unexpected end of json") |
| 64 | + || error_str.contains("unexpected eof"); |
| 65 | + |
| 66 | + if is_incomplete_json { |
| 67 | + // Incomplete JSON - buffer for retry with next chunk |
| 68 | + self.incomplete_event_buffer = sse_event.raw_line.as_bytes().to_vec(); |
| 69 | + break; |
| 70 | + } else { |
| 71 | + // Other error (unsupported event type, validation error, etc.) |
| 72 | + // Skip this event and continue processing others |
| 73 | + continue; |
| 74 | + } |
| 75 | + } |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + Ok(transformed_events) |
| 80 | + } |
| 81 | + |
| 82 | + /// Check if there are buffered incomplete bytes |
| 83 | + pub fn has_buffered_data(&self) -> bool { |
| 84 | + !self.incomplete_event_buffer.is_empty() |
| 85 | + } |
| 86 | + |
| 87 | + /// Get the size of buffered incomplete data (for debugging/logging) |
| 88 | + pub fn buffered_size(&self) -> usize { |
| 89 | + self.incomplete_event_buffer.len() |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +#[cfg(test)] |
| 94 | +mod tests { |
| 95 | + use super::*; |
| 96 | + use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; |
| 97 | + use crate::apis::openai::OpenAIApi; |
| 98 | + |
| 99 | + #[test] |
| 100 | + fn test_complete_events_process_immediately() { |
| 101 | + let mut processor = SseChunkProcessor::new(); |
| 102 | + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 103 | + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 104 | + |
| 105 | + let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; |
| 106 | + |
| 107 | + let events = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap(); |
| 108 | + |
| 109 | + assert_eq!(events.len(), 1); |
| 110 | + assert!(!processor.has_buffered_data()); |
| 111 | + } |
| 112 | + |
| 113 | + #[test] |
| 114 | + fn test_incomplete_json_buffered_and_completed() { |
| 115 | + let mut processor = SseChunkProcessor::new(); |
| 116 | + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 117 | + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 118 | + |
| 119 | + // First chunk with incomplete JSON |
| 120 | + let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chu"; |
| 121 | + |
| 122 | + let events1 = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap(); |
| 123 | + |
| 124 | + assert_eq!(events1.len(), 0, "Incomplete event should not be processed"); |
| 125 | + assert!(processor.has_buffered_data(), "Incomplete data should be buffered"); |
| 126 | + |
| 127 | + // Second chunk completes the JSON |
| 128 | + let chunk2 = b"nk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; |
| 129 | + |
| 130 | + let events2 = processor.process_chunk(chunk2, &client_api, &upstream_api).unwrap(); |
| 131 | + |
| 132 | + assert_eq!(events2.len(), 1, "Complete event should be processed"); |
| 133 | + assert!(!processor.has_buffered_data(), "Buffer should be cleared after completion"); |
| 134 | + } |
| 135 | + |
| 136 | + #[test] |
| 137 | + fn test_multiple_events_with_one_incomplete() { |
| 138 | + let mut processor = SseChunkProcessor::new(); |
| 139 | + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 140 | + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 141 | + |
| 142 | + // Chunk with 2 complete events and 1 incomplete |
| 143 | + let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"A\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"B\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-125\",\"object\":\"chat.completion.chu"; |
| 144 | + |
| 145 | + let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap(); |
| 146 | + |
| 147 | + assert_eq!(events.len(), 2, "Two complete events should be processed"); |
| 148 | + assert!(processor.has_buffered_data(), "Incomplete third event should be buffered"); |
| 149 | + } |
| 150 | + |
| 151 | + #[test] |
| 152 | + fn test_anthropic_signature_delta_from_production_logs() { |
| 153 | + use crate::apis::anthropic::AnthropicApi; |
| 154 | + |
| 155 | + let mut processor = SseChunkProcessor::new(); |
| 156 | + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); |
| 157 | + let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages); |
| 158 | + |
| 159 | + // Exact chunk from production logs - signature_delta event followed by content_block_stop |
| 160 | + let chunk = br#"event: content_block_delta |
| 161 | +data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"ErECCkYIChgCKkC7lAf/BOatd0I4NnANYNEDKl5/WSsjNK44AETnLoy3i5FfdYMAb0m4qMLJD6A04QnM4Hf3VpGqq/snA/9vvNxCEgw3CYcHcj0aTdqOisQaDOhlVBtAUKkoh3WopSIwAbJp4jG/41vVWBj63eaR7KFJ37OdY1byjlPkaGDUJRcWc/YfUWIDSAToomq2fB4VKpgBk+swVYxLZ709gQvyTCT+3vO/I+yexZpkx6eBl/+YCgQXTeviZ+hTxSoPVayf5vEQoc19ZA4MEkZ7yBInRgk8vUxAJITSf+vOvDIBsElpgkLfSjARCasjh78wONg39AkAoIbKzU+Q2l1htUwXcqQ2b+b5DrY9+Oxae4pBVGQlWU36XAHsa/KG+ejfdwhWJM7FNL3uphwAf0oYAQ=="}} |
| 162 | +
|
| 163 | +event: content_block_stop |
| 164 | +data: {"type":"content_block_stop","index":0} |
| 165 | +
|
| 166 | +"#; |
| 167 | + |
| 168 | + let result = processor.process_chunk(chunk, &client_api, &upstream_api); |
| 169 | + |
| 170 | + match result { |
| 171 | + Ok(events) => { |
| 172 | + println!("Successfully processed {} events", events.len()); |
| 173 | + for (i, event) in events.iter().enumerate() { |
| 174 | + println!("Event {}: event={:?}, has_data={}", i, event.event, event.data.is_some()); |
| 175 | + } |
| 176 | + // Should successfully process both events (signature_delta + content_block_stop) |
| 177 | + assert!(events.len() >= 2, "Should process at least 2 complete events (signature_delta + stop), got {}", events.len()); |
| 178 | + assert!(!processor.has_buffered_data(), "Complete events should not be buffered"); |
| 179 | + } |
| 180 | + Err(e) => { |
| 181 | + panic!("Failed to process signature_delta chunk - this means SignatureDelta is not properly handled: {}", e); |
| 182 | + } |
| 183 | + } |
| 184 | + } |
| 185 | + |
| 186 | + #[test] |
| 187 | + fn test_unsupported_event_does_not_block_subsequent_events() { |
| 188 | + let mut processor = SseChunkProcessor::new(); |
| 189 | + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 190 | + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); |
| 191 | + |
| 192 | + // Chunk with an unsupported/invalid event followed by a valid event |
| 193 | + // First event has invalid JSON structure that will fail validation (not incomplete) |
| 194 | + // Second event is valid and should be processed |
| 195 | + let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"unsupported_field_causing_validation_error\":true},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; |
| 196 | + |
| 197 | + let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap(); |
| 198 | + |
| 199 | + // Should skip the invalid event and process the valid one |
| 200 | + // (If we were buffering all errors, we'd get 0 events and have buffered data) |
| 201 | + assert!(events.len() >= 1, "Should process at least the valid event, got {} events", events.len()); |
| 202 | + assert!(!processor.has_buffered_data(), "Invalid (non-incomplete) events should not be buffered"); |
| 203 | + } |
| 204 | + |
| 205 | + #[test] |
| 206 | + fn test_unknown_delta_type_skipped_others_processed() { |
| 207 | + use crate::apis::anthropic::AnthropicApi; |
| 208 | + |
| 209 | + let mut processor = SseChunkProcessor::new(); |
| 210 | + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); |
| 211 | + let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages); |
| 212 | + |
| 213 | + // Chunk with valid event, unsupported delta type, then another valid event |
| 214 | + // This simulates a future API change where Anthropic adds a new delta type we don't support yet |
| 215 | + let chunk = br#"event: content_block_delta |
| 216 | +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} |
| 217 | +
|
| 218 | +event: content_block_delta |
| 219 | +data: {"type":"content_block_delta","index":0,"delta":{"type":"future_unsupported_delta","future_field":"some_value"}} |
| 220 | +
|
| 221 | +event: content_block_delta |
| 222 | +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}} |
| 223 | +
|
| 224 | +"#; |
| 225 | + |
| 226 | + let result = processor.process_chunk(chunk, &client_api, &upstream_api); |
| 227 | + |
| 228 | + match result { |
| 229 | + Ok(events) => { |
| 230 | + println!("Processed {} events (unsupported event should be skipped)", events.len()); |
| 231 | + // Should process the 2 valid text_delta events and skip the unsupported one |
| 232 | + // We expect at least 2 events (the valid ones), unsupported should be skipped |
| 233 | + assert!(events.len() >= 2, "Should process at least 2 valid events, got {}", events.len()); |
| 234 | + assert!(!processor.has_buffered_data(), "Unsupported events should be skipped, not buffered"); |
| 235 | + } |
| 236 | + Err(e) => { |
| 237 | + panic!("Should not fail on unsupported delta type, should skip it: {}", e); |
| 238 | + } |
| 239 | + } |
| 240 | + } |
| 241 | +} |
0 commit comments