|
14 | 14 | from vllm.sampling_params import SamplingParams |
15 | 15 | from vllm.v1.engine.async_llm import AsyncLLM |
16 | 16 | from vllm.v1.engine.llm_engine import LLMEngine |
| 17 | +from vllm.v1.executor import multiproc_executor as multiproc_executor_module |
17 | 18 | from vllm.v1.executor.abstract import Executor |
18 | 19 | from vllm.v1.executor.multiproc_executor import MultiprocExecutor |
19 | 20 | from vllm.v1.executor.uniproc_executor import ( |
@@ -43,6 +44,50 @@ def test_supports_async_scheduling_multiproc_executor(): |
43 | 44 | assert MultiprocExecutor.supports_async_scheduling() is True |
44 | 45 |
|
45 | 46 |
|
| 47 | +class _FakeClock: |
| 48 | + def __init__(self) -> None: |
| 49 | + self.now = 0.0 |
| 50 | + |
| 51 | + def time(self) -> float: |
| 52 | + return self.now |
| 53 | + |
| 54 | + def sleep(self, seconds: float) -> None: |
| 55 | + self.now += seconds |
| 56 | + |
| 57 | + |
| 58 | +class _FakeProcess: |
| 59 | + def __init__(self, clock: _FakeClock, exits_at: float) -> None: |
| 60 | + self.clock = clock |
| 61 | + self.exits_at = exits_at |
| 62 | + self.terminate_called = False |
| 63 | + |
| 64 | + def is_alive(self) -> bool: |
| 65 | + return self.clock.time() < self.exits_at |
| 66 | + |
| 67 | + def terminate(self) -> None: |
| 68 | + self.terminate_called = True |
| 69 | + |
| 70 | + |
| 71 | +@pytest.mark.parametrize( |
| 72 | + ("timeout", "exits_at", "expected_terminate"), |
| 73 | + [ |
| 74 | + pytest.param(6, 5, False, id="worker-exits-before-timeout"), |
| 75 | + pytest.param(6, 7, True, id="worker-exceeds-timeout"), |
| 76 | + ], |
| 77 | +) |
| 78 | +def test_multiproc_executor_worker_termination_timeout( |
| 79 | + monkeypatch, timeout, exits_at, expected_terminate |
| 80 | +): |
| 81 | + monkeypatch.setenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", str(timeout)) |
| 82 | + clock = _FakeClock() |
| 83 | + monkeypatch.setattr(multiproc_executor_module.time, "time", clock.time) |
| 84 | + monkeypatch.setattr(multiproc_executor_module.time, "sleep", clock.sleep) |
| 85 | + executor = MultiprocExecutor.__new__(MultiprocExecutor) |
| 86 | + proc = _FakeProcess(clock, exits_at=exits_at) |
| 87 | + executor._ensure_worker_termination([proc]) |
| 88 | + assert proc.terminate_called is expected_terminate |
| 89 | + |
| 90 | + |
46 | 91 | class CustomMultiprocExecutor(MultiprocExecutor): |
47 | 92 | def collective_rpc( |
48 | 93 | self, |
|
0 commit comments