Skip to content

Commit db40143

Browse files
committed
Reject duplicate SSE connections with 409 to prevent stream hijacking
An attacker who obtains a valid session ID could issue a GET request to establish a new SSE stream, silently replacing the victim's stream. All subsequent server-to-client messages (tool responses, notifications) would then be delivered to the attacker instead of the legitimate client, with no indication to the victim. The fix mirrors the Python SDK's approach: when a GET request arrives for a session that already has an active SSE stream, respond with HTTP 409 Conflict ("Only one SSE stream is allowed per session") and leave the existing connection intact. Additionally, `store_stream_for_session` checks for an existing stream under the mutex before storing a new one, closing the TOCTOU window between the HTTP-level check and the deferred stream assignment. Note: exploiting this requires prior knowledge of the session ID (a `SecureRandom.uuid`), so the practical severity is low in typical local MCP deployments. The change is still worthwhile as defense-in-depth, especially for multi-user or network-exposed server deployments.
1 parent 7cede9c commit db40143

File tree

2 files changed

+75
-4
lines changed

2 files changed

+75
-4
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def handle_get(request)
142142

143143
return missing_session_id_response unless session_id
144144
return session_not_found_response unless session_exists?(session_id)
145+
return session_already_connected_response if get_session_stream(session_id)
145146

146147
setup_sse_stream(session_id)
147148
end
@@ -315,6 +316,14 @@ def session_not_found_response
315316
[404, { "Content-Type" => "application/json" }, [{ error: "Session not found" }.to_json]]
316317
end
317318

319+
def session_already_connected_response
320+
[
321+
409,
322+
{ "Content-Type" => "application/json" },
323+
[{ error: "Conflict: Only one SSE stream is allowed per session" }.to_json],
324+
]
325+
end
326+
318327
def setup_sse_stream(session_id)
319328
body = create_sse_body(session_id)
320329

@@ -329,17 +338,22 @@ def setup_sse_stream(session_id)
329338

330339
def create_sse_body(session_id)
331340
proc do |stream|
332-
store_stream_for_session(session_id, stream)
333-
start_keepalive_thread(session_id)
341+
stored = store_stream_for_session(session_id, stream)
342+
start_keepalive_thread(session_id) if stored
334343
end
335344
end
336345

337346
def store_stream_for_session(session_id, stream)
338347
@mutex.synchronize do
339-
if @sessions[session_id]
340-
@sessions[session_id][:stream] = stream
348+
session = @sessions[session_id]
349+
if session && !session[:stream]
350+
session[:stream] = stream
341351
else
352+
# Either session was removed, or another request already established a stream.
342353
stream.close
354+
# `stream.close` may return a truthy value depending on the stream class.
355+
# Explicitly return nil to guarantee a falsy return for callers.
356+
nil
343357
end
344358
end
345359
end

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,63 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
272272
assert_equal "Missing session ID", body["error"]
273273
end
274274

275+
test "rejects duplicate SSE connection with 409" do
276+
# Create a session
277+
init_request = create_rack_request(
278+
"POST",
279+
"/",
280+
{ "CONTENT_TYPE" => "application/json" },
281+
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
282+
)
283+
init_response = @transport.handle_request(init_request)
284+
session_id = init_response[1]["Mcp-Session-Id"]
285+
286+
# Simulate an active SSE stream by storing a stream object in the session
287+
mock_stream = StringIO.new
288+
@transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
289+
290+
# Attempt a second GET request for the same session
291+
get_request = create_rack_request(
292+
"GET",
293+
"/",
294+
{ "HTTP_MCP_SESSION_ID" => session_id },
295+
)
296+
297+
response = @transport.handle_request(get_request)
298+
assert_equal 409, response[0]
299+
assert_equal({ "Content-Type" => "application/json" }, response[1])
300+
301+
body = JSON.parse(response[2][0])
302+
assert_equal "Conflict: Only one SSE stream is allowed per session", body["error"]
303+
end
304+
305+
test "store_stream_for_session does not overwrite existing stream (TOCTOU guard)" do
306+
# Create a session
307+
init_request = create_rack_request(
308+
"POST",
309+
"/",
310+
{ "CONTENT_TYPE" => "application/json" },
311+
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
312+
)
313+
init_response = @transport.handle_request(init_request)
314+
session_id = init_response[1]["Mcp-Session-Id"]
315+
316+
# Establish stream A
317+
stream_a = StringIO.new
318+
@transport.send(:store_stream_for_session, session_id, stream_a)
319+
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:stream]
320+
321+
# Attempt to store stream B (simulating a racing request)
322+
stream_b = StringIO.new
323+
@transport.send(:store_stream_for_session, session_id, stream_b)
324+
325+
# Stream A should still be the active stream
326+
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:stream]
327+
328+
# Stream B should have been closed
329+
assert stream_b.closed?
330+
end
331+
275332
test "handles GET request with invalid session ID" do
276333
request = create_rack_request(
277334
"GET",

0 commit comments

Comments
 (0)