@@ -103,7 +103,7 @@ public actor HTTPClientTransport: Actor, Transport {
103
103
request. addValue ( sessionID, forHTTPHeaderField: " Mcp-Session-Id " )
104
104
}
105
105
106
- let ( responseData , response) = try await session. data ( for: request)
106
+ let ( responseStream , response) = try await session. bytes ( for: request)
107
107
108
108
guard let httpResponse = response as? HTTPURLResponse else {
109
109
throw MCPError . internalError ( " Invalid HTTP response " )
@@ -120,19 +120,20 @@ public actor HTTPClientTransport: Actor, Transport {
120
120
121
121
// Handle different response types
122
122
switch httpResponse. statusCode {
123
- case 200 , 201 , 202 :
123
+ case 200 ..< 300 where contentType . contains ( " text/event-stream " ) :
124
124
// For SSE, the processing happens in the streaming task
125
- if contentType. contains ( " text/event-stream " ) {
126
- logger. debug ( " Received SSE response, processing in streaming task " )
127
- // The streaming is handled by the SSE task if active
128
- return
129
- }
125
+ logger. debug ( " Received SSE response, processing in streaming task " )
126
+ try await self . processSSE ( responseStream)
130
127
128
+ case 200 ..< 300 where contentType. contains ( " application/json " ) :
131
129
// For JSON responses, deliver the data directly
132
- if contentType . contains ( " application/json " ) && !responseData . isEmpty {
133
- logger . debug ( " Received JSON response " , metadata : [ " size " : " \( responseData . count ) " ] )
134
- messageContinuation . yield ( responseData )
130
+ var buffer = Data ( )
131
+ for try await byte in responseStream {
132
+ buffer . append ( byte )
135
133
}
134
+ logger. debug ( " Received JSON response " , metadata: [ " size " : " \( buffer. count) " ] )
135
+ messageContinuation. yield ( buffer)
136
+
136
137
case 404 :
137
138
// If we get a 404 with a session ID, it means our session is invalid
138
139
if sessionID != nil {
@@ -141,8 +142,16 @@ public actor HTTPClientTransport: Actor, Transport {
141
142
throw MCPError . internalError ( " Session expired " )
142
143
}
143
144
throw MCPError . internalError ( " Endpoint not found " )
145
+
146
+ case 405 :
147
+ // If we get a 405, it means the server does not support streaming,
148
+ // so we should cancel the streaming task.
149
+ self . streamingTask? . cancel ( )
150
+ throw MCPError . internalError ( " Server does not support streaming " )
151
+
144
152
default :
145
- throw MCPError . internalError ( " HTTP error: \( httpResponse. statusCode) " )
153
+ throw MCPError . internalError (
154
+ " Unexpected HTTP response: \( httpResponse. statusCode) \( contentType) " )
146
155
}
147
156
}
148
157
@@ -175,6 +184,10 @@ public actor HTTPClientTransport: Actor, Transport {
175
184
private func connectToEventStream( ) async throws {
176
185
logger. warning ( " SSE is not supported on this platform " )
177
186
}
187
+
188
+ private func processSSE( _ stream: URLSession . AsyncBytes ) async throws {
189
+ logger. warning ( " SSE is not supported on this platform " )
190
+ }
178
191
#else
179
192
/// Establishes an SSE connection to the server
180
193
private func connectToEventStream( ) async throws {
@@ -216,7 +229,10 @@ public actor HTTPClientTransport: Actor, Transport {
216
229
logger. debug ( " Session ID received " , metadata: [ " sessionID " : " \( newSessionID) " ] )
217
230
}
218
231
219
- // Process the SSE stream
232
+ try await self . processSSE ( stream)
233
+ }
234
+
235
+ private func processSSE( _ stream: URLSession . AsyncBytes ) async throws {
220
236
do {
221
237
for try await event in stream. events {
222
238
// Check if task has been cancelled
0 commit comments