Skip to content

Commit ab586e8

Browse files
committed
Use EventSource for SSE implementation
1 parent 4e03f8f commit ab586e8

File tree

1 file changed

+22
-90
lines changed

1 file changed

+22
-90
lines changed

Sources/MCP/Base/Transports/HTTPClientTransport.swift

Lines changed: 22 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import EventSource
12
import Foundation
23
import Logging
34

@@ -11,7 +12,6 @@ public actor HTTPClientTransport: Actor, Transport {
1112
public private(set) var sessionID: String?
1213
private let streaming: Bool
1314
private var streamingTask: Task<Void, Never>?
14-
private var lastEventID: String?
1515
public nonisolated let logger: Logger
1616

1717
private var isConnected = false
@@ -183,17 +183,13 @@ public actor HTTPClientTransport: Actor, Transport {
183183
var request = URLRequest(url: endpoint)
184184
request.httpMethod = "GET"
185185
request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
186+
request.addValue("no-cache", forHTTPHeaderField: "Cache-Control")
186187

187188
// Add session ID if available
188189
if let sessionID = sessionID {
189190
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
190191
}
191192

192-
// Add Last-Event-ID header for resumability if available
193-
if let lastEventID = lastEventID {
194-
request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
195-
}
196-
197193
logger.debug("Starting SSE connection")
198194

199195
// Create URLSession task for SSE
@@ -217,95 +213,31 @@ public actor HTTPClientTransport: Actor, Transport {
217213
// Extract session ID if present
218214
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
219215
self.sessionID = newSessionID
216+
logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"])
220217
}
221218

222219
// Process the SSE stream
223-
var buffer = ""
224-
var eventType = ""
225-
var eventID: String?
226-
var eventData = ""
227-
228-
for try await byte in stream {
229-
if Task.isCancelled { break }
230-
231-
guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
232-
buffer.append(char)
233-
234-
// Process complete lines
235-
while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) {
236-
var line = buffer[..<newlineIndex]
237-
if line.hasSuffix("\r") {
238-
line = line.dropLast()
239-
}
240-
241-
buffer = String(buffer[buffer.index(after: newlineIndex)...])
242-
243-
// Empty line marks the end of an event
244-
if line.isEmpty {
245-
if !eventData.isEmpty {
246-
// Process the event
247-
if eventType == "id" {
248-
lastEventID = eventID
249-
} else {
250-
// Default event type is "message" if not specified
251-
if let data = eventData.data(using: .utf8) {
252-
logger.debug(
253-
"SSE event received",
254-
metadata: [
255-
"type": "\(eventType.isEmpty ? "message" : eventType)",
256-
"id": "\(eventID ?? "none")",
257-
])
258-
messageContinuation.yield(data)
259-
}
260-
}
261-
262-
// Reset for next event
263-
eventType = ""
264-
eventData = ""
265-
}
266-
continue
267-
}
268-
269-
if line.hasSuffix("\r") {
270-
line = line.dropLast()
271-
}
272-
273-
// Lines starting with ":" are comments
274-
if line.hasPrefix(":") { continue }
275-
276-
// Parse field: value format
277-
if let colonIndex = line.firstIndex(of: ":") {
278-
let field = String(line[..<colonIndex])
279-
var value = String(line[line.index(after: colonIndex)...])
280-
281-
// Trim leading space
282-
if value.hasPrefix(" ") {
283-
value = String(value.dropFirst())
284-
}
285-
286-
// Process based on field
287-
switch field {
288-
case "event":
289-
eventType = value
290-
case "data":
291-
if !eventData.isEmpty {
292-
eventData.append("\n")
293-
}
294-
eventData.append(value)
295-
case "id":
296-
if !value.contains("\0") { // ID must not contain NULL
297-
eventID = value
298-
lastEventID = value
299-
}
300-
case "retry":
301-
// Retry timing not implemented
302-
break
303-
default:
304-
// Unknown fields are ignored per SSE spec
305-
break
306-
}
220+
do {
221+
for try await event in stream.events {
222+
// Check if task has been cancelled
223+
if Task.isCancelled { break }
224+
225+
logger.debug(
226+
"SSE event received",
227+
metadata: [
228+
"type": "\(event.event ?? "message")",
229+
"id": "\(event.id ?? "none")",
230+
]
231+
)
232+
233+
// Convert the event data to Data and yield it to the message stream
234+
if !event.data.isEmpty, let data = event.data.data(using: .utf8) {
235+
messageContinuation.yield(data)
307236
}
308237
}
238+
} catch {
239+
logger.error("Error processing SSE events: \(error)")
240+
throw error
309241
}
310242
}
311243
#endif

0 commit comments

Comments
 (0)