Skip to content

Commit 39be21a

Browse files
committed
stress: add IO timeouts, jitter, retries, and storm mode to reduce reload crashes
1 parent 5fc661e commit 39be21a

File tree

1 file changed

+174
-111
lines changed

1 file changed

+174
-111
lines changed

tools/stress_mcp.py

Lines changed: 174 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@
66
import struct
77
import time
88
from pathlib import Path
9+
import random
10+
import sys
11+
12+
13+
TIMEOUT = float(os.environ.get("MCP_STRESS_TIMEOUT", "2.0"))
14+
DEBUG = os.environ.get("MCP_STRESS_DEBUG", "").lower() in ("1", "true", "yes")
15+
16+
17+
def dlog(*args):
18+
if DEBUG:
19+
print(*args, file=sys.stderr)
920

1021

1122
def find_status_files() -> list[Path]:
@@ -60,7 +71,7 @@ async def write_frame(writer: asyncio.StreamWriter, payload: bytes) -> None:
6071
header = struct.pack(">Q", len(payload))
6172
writer.write(header)
6273
writer.write(payload)
63-
await writer.drain()
74+
await asyncio.wait_for(writer.drain(), timeout=TIMEOUT)
6475

6576

6677
async def do_handshake(reader: asyncio.StreamReader) -> None:
@@ -83,152 +94,203 @@ def make_execute_menu_item(menu_path: str) -> bytes:
8394
async def client_loop(idx: int, host: str, port: int, stop_time: float, stats: dict):
8495
reconnect_delay = 0.2
8596
while time.time() < stop_time:
97+
writer = None
8698
try:
87-
reader, writer = await asyncio.open_connection(host, port)
88-
await do_handshake(reader)
99+
# slight stagger to prevent burst synchronization across clients
100+
await asyncio.sleep(0.003 * (idx % 11))
101+
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
102+
await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
89103
# Send a quick ping first
90104
await write_frame(writer, make_ping_frame())
91-
_ = await read_frame(reader) # ignore content
105+
_ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT) # ignore content
92106

93107
# Main activity loop (keep-alive + light load). Edit spam handled by reload_churn_task.
94108
while time.time() < stop_time:
95109
# Ping-only; edits are sent via reload_churn_task to avoid console spam
96110
await write_frame(writer, make_ping_frame())
97-
_ = await read_frame(reader)
111+
_ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)
98112
stats["pings"] += 1
99-
await asyncio.sleep(0.02)
113+
await asyncio.sleep(0.02 + random.uniform(-0.003, 0.003))
100114

101-
except (ConnectionError, OSError, asyncio.IncompleteReadError):
115+
except (ConnectionError, OSError, asyncio.IncompleteReadError, asyncio.TimeoutError):
102116
stats["disconnects"] += 1
117+
dlog(f"[client {idx}] disconnect/backoff {reconnect_delay}s")
103118
await asyncio.sleep(reconnect_delay)
104119
reconnect_delay = min(reconnect_delay * 1.5, 2.0)
105120
continue
106121
except Exception:
107122
stats["errors"] += 1
123+
dlog(f"[client {idx}] unexpected error")
108124
await asyncio.sleep(0.2)
109125
continue
110126
finally:
111-
try:
112-
writer.close() # type: ignore
113-
await writer.wait_closed() # type: ignore
114-
except Exception:
115-
pass
127+
if writer is not None:
128+
try:
129+
writer.close()
130+
await writer.wait_closed()
131+
except Exception:
132+
pass
116133

117134

118-
async def reload_churn_task(project_path: str, stop_time: float, unity_file: str | None, host: str, port: int, stats: dict):
135+
async def reload_churn_task(project_path: str, stop_time: float, unity_file: str | None, host: str, port: int, stats: dict, storm_count: int = 1):
119136
# Use script edit tool to touch a C# file, which triggers compilation reliably
120137
path = Path(unity_file) if unity_file else None
121138
seq = 0
122139
proj_root = Path(project_path).resolve() if project_path else None
140+
# Build candidate list for storm mode
141+
candidates: list[Path] = []
142+
if proj_root:
143+
try:
144+
for p in (proj_root / "Assets").rglob("*.cs"):
145+
candidates.append(p.resolve())
146+
except Exception:
147+
candidates = []
148+
if path and path.exists():
149+
rp = path.resolve()
150+
if rp not in candidates:
151+
candidates.append(rp)
123152
while time.time() < stop_time:
124153
try:
125154
if path and path.exists():
126-
# Build a tiny ApplyTextEdits request that toggles a trailing comment
127-
relative = None
128-
try:
129-
# Derive Unity-relative path under Assets/ (cross-platform)
130-
resolved = path.resolve()
131-
parts = list(resolved.parts)
132-
if "Assets" in parts:
133-
i = parts.index("Assets")
134-
relative = Path(*parts[i:]).as_posix()
135-
elif proj_root and str(resolved).startswith(str(proj_root)):
136-
rel = resolved.relative_to(proj_root)
137-
parts2 = list(rel.parts)
138-
if "Assets" in parts2:
139-
i2 = parts2.index("Assets")
140-
relative = Path(*parts2[i2:]).as_posix()
141-
except Exception:
155+
# Determine files to touch this cycle
156+
targets: list[Path]
157+
if storm_count and storm_count > 1 and candidates:
158+
k = min(max(1, storm_count), len(candidates))
159+
targets = random.sample(candidates, k)
160+
else:
161+
targets = [path]
162+
163+
for tpath in targets:
164+
# Build a tiny ApplyTextEdits request that toggles a trailing comment
142165
relative = None
143-
144-
if relative:
145-
# Derive name and directory for ManageScript and compute precondition SHA + EOF position
146-
name_base = Path(relative).stem
147-
dir_path = str(Path(relative).parent).replace('\\', '/')
148-
149-
# 1) Read current contents via manage_script.read to compute SHA and true EOF location
150166
try:
151-
reader, writer = await asyncio.open_connection(host, port)
152-
await do_handshake(reader)
153-
read_payload = {
167+
# Derive Unity-relative path under Assets/ (cross-platform)
168+
resolved = tpath.resolve()
169+
parts = list(resolved.parts)
170+
if "Assets" in parts:
171+
i = parts.index("Assets")
172+
relative = Path(*parts[i:]).as_posix()
173+
elif proj_root and str(resolved).startswith(str(proj_root)):
174+
rel = resolved.relative_to(proj_root)
175+
parts2 = list(rel.parts)
176+
if "Assets" in parts2:
177+
i2 = parts2.index("Assets")
178+
relative = Path(*parts2[i2:]).as_posix()
179+
except Exception:
180+
relative = None
181+
182+
if relative:
183+
# Derive name and directory for ManageScript and compute precondition SHA + EOF position
184+
name_base = Path(relative).stem
185+
dir_path = str(Path(relative).parent).replace('\\', '/')
186+
187+
# 1) Read current contents via manage_script.read to compute SHA and true EOF location
188+
contents = None
189+
read_success = False
190+
for attempt in range(3):
191+
writer = None
192+
try:
193+
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
194+
await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
195+
read_payload = {
196+
"type": "manage_script",
197+
"params": {
198+
"action": "read",
199+
"name": name_base,
200+
"path": dir_path
201+
}
202+
}
203+
await write_frame(writer, json.dumps(read_payload).encode("utf-8"))
204+
resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)
205+
206+
read_obj = json.loads(resp.decode("utf-8", errors="ignore"))
207+
result = read_obj.get("result", read_obj) if isinstance(read_obj, dict) else {}
208+
if result.get("success"):
209+
data_obj = result.get("data", {})
210+
contents = data_obj.get("contents") or ""
211+
read_success = True
212+
break
213+
except Exception:
214+
# retry with backoff
215+
await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1))
216+
finally:
217+
if 'writer' in locals() and writer is not None:
218+
try:
219+
writer.close()
220+
await writer.wait_closed()
221+
except Exception:
222+
pass
223+
224+
if not read_success or contents is None:
225+
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
226+
await asyncio.sleep(0.5)
227+
continue
228+
229+
# Compute SHA and EOF insertion point
230+
import hashlib
231+
sha = hashlib.sha256(contents.encode("utf-8")).hexdigest()
232+
lines = contents.splitlines(keepends=True)
233+
# Insert at true EOF (safe against header guards)
234+
end_line = len(lines) + 1 # 1-based exclusive end
235+
end_col = 1
236+
237+
# Build a unique marker append; ensure it begins with a newline if needed
238+
marker = f"// MCP_STRESS seq={seq} time={int(time.time())}"
239+
seq += 1
240+
insert_text = ("\n" if not contents.endswith("\n") else "") + marker + "\n"
241+
242+
# 2) Apply text edits with immediate refresh and precondition
243+
apply_payload = {
154244
"type": "manage_script",
155245
"params": {
156-
"action": "read",
246+
"action": "apply_text_edits",
157247
"name": name_base,
158-
"path": dir_path
248+
"path": dir_path,
249+
"edits": [
250+
{
251+
"startLine": end_line,
252+
"startCol": end_col,
253+
"endLine": end_line,
254+
"endCol": end_col,
255+
"newText": insert_text
256+
}
257+
],
258+
"precondition_sha256": sha,
259+
"options": {"refresh": "immediate", "validate": "standard"}
159260
}
160261
}
161-
await write_frame(writer, json.dumps(read_payload).encode("utf-8"))
162-
resp = await read_frame(reader)
163-
writer.close()
164-
await writer.wait_closed()
165-
166-
read_obj = json.loads(resp.decode("utf-8", errors="ignore"))
167-
result = read_obj.get("result", read_obj) if isinstance(read_obj, dict) else {}
168-
if not result.get("success"):
169-
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
170-
await asyncio.sleep(0.5)
171-
continue
172-
data_obj = result.get("data", {})
173-
contents = data_obj.get("contents") or ""
174-
except Exception:
175-
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
176-
await asyncio.sleep(0.5)
177-
continue
178-
179-
# Compute SHA and EOF insertion point
180-
import hashlib
181-
sha = hashlib.sha256(contents.encode("utf-8")).hexdigest()
182-
lines = contents.splitlines(keepends=True)
183-
# Insert at true EOF (safe against header guards)
184-
end_line = len(lines) + 1 # 1-based exclusive end
185-
end_col = 1
186-
187-
# Build a unique marker append; ensure it begins with a newline if needed
188-
marker = f"// MCP_STRESS seq={seq} time={int(time.time())}"
189-
seq += 1
190-
insert_text = ("\n" if not contents.endswith("\n") else "") + marker + "\n"
191-
192-
# 2) Apply text edits with immediate refresh and precondition
193-
apply_payload = {
194-
"type": "manage_script",
195-
"params": {
196-
"action": "apply_text_edits",
197-
"name": name_base,
198-
"path": dir_path,
199-
"edits": [
200-
{
201-
"startLine": end_line,
202-
"startCol": end_col,
203-
"endLine": end_line,
204-
"endCol": end_col,
205-
"newText": insert_text
206-
}
207-
],
208-
"precondition_sha256": sha,
209-
"options": {"refresh": "immediate", "validate": "standard"}
210-
}
211-
}
212262

213-
try:
214-
reader, writer = await asyncio.open_connection(host, port)
215-
await do_handshake(reader)
216-
await write_frame(writer, json.dumps(apply_payload).encode("utf-8"))
217-
resp = await read_frame(reader)
218-
try:
219-
data = json.loads(resp.decode("utf-8", errors="ignore"))
220-
result = data.get("result", data) if isinstance(data, dict) else {}
221-
ok = bool(result.get("success", False))
222-
if ok:
223-
stats["applies"] = stats.get("applies", 0) + 1
224-
else:
225-
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
226-
except Exception:
263+
apply_success = False
264+
for attempt in range(3):
265+
writer = None
266+
try:
267+
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
268+
await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
269+
await write_frame(writer, json.dumps(apply_payload).encode("utf-8"))
270+
resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)
271+
try:
272+
data = json.loads(resp.decode("utf-8", errors="ignore"))
273+
result = data.get("result", data) if isinstance(data, dict) else {}
274+
ok = bool(result.get("success", False))
275+
if ok:
276+
stats["applies"] = stats.get("applies", 0) + 1
277+
apply_success = True
278+
break
279+
except Exception:
280+
# fall through to retry
281+
pass
282+
except Exception:
283+
# retry with backoff
284+
await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1))
285+
finally:
286+
if 'writer' in locals() and writer is not None:
287+
try:
288+
writer.close()
289+
await writer.wait_closed()
290+
except Exception:
291+
pass
292+
if not apply_success:
227293
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
228-
writer.close()
229-
await writer.wait_closed()
230-
except Exception:
231-
stats["apply_errors"] = stats.get("apply_errors", 0) + 1
232294

233295
except Exception:
234296
pass
@@ -242,6 +304,7 @@ async def main():
242304
ap.add_argument("--unity-file", default=str(Path(__file__).resolve().parents[1] / "TestProjects" / "UnityMCPTests" / "Assets" / "Scripts" / "LongUnityScriptClaudeTest.cs"))
243305
ap.add_argument("--clients", type=int, default=10)
244306
ap.add_argument("--duration", type=int, default=60)
307+
ap.add_argument("--storm-count", type=int, default=1, help="Number of scripts to touch each cycle")
245308
args = ap.parse_args()
246309

247310
port = discover_port(args.project)
@@ -255,7 +318,7 @@ async def main():
255318
tasks.append(asyncio.create_task(client_loop(i, args.host, port, stop_time, stats)))
256319

257320
# Spawn reload churn task
258-
tasks.append(asyncio.create_task(reload_churn_task(args.project, stop_time, args.unity_file, args.host, port, stats)))
321+
tasks.append(asyncio.create_task(reload_churn_task(args.project, stop_time, args.unity_file, args.host, port, stats, storm_count=args.storm_count)))
259322

260323
await asyncio.gather(*tasks, return_exceptions=True)
261324
print(json.dumps({"port": port, "stats": stats}, indent=2))

0 commit comments

Comments
 (0)