diff --git a/tensorrt_llm/executor/ipc.py b/tensorrt_llm/executor/ipc.py index 2a86f50d650..5d45ebe4c12 100644 --- a/tensorrt_llm/executor/ipc.py +++ b/tensorrt_llm/executor/ipc.py @@ -125,13 +125,36 @@ def put(self, obj: Any): # Send data without HMAC self.socket.send_pyobj(obj) - def put_noblock(self, obj: Any): + def put_noblock(self, + obj: Any, + *, + retry: int = 1, + wait_time: float = 0.001): + ''' + Put an object into the queue without blocking, and retry if the send fails. + NOTE: It won't raise any error if the send fails. + + Parameters: + obj (Any): The object to send. + retry (int): The number of times to retry sending the object. + wait_time (float): The time to wait before retrying. + ''' + + assert retry >= 0 and retry <= 10, "Retry must be between 0 and 10, adjust the wait_time if needed" + self.setup_lazily() with nvtx_range_debug("send", color="blue", category="IPC"): data = pickle.dumps(obj) # nosec B301 if self.use_hmac_encryption: data = self._sign_data(data) - self.socket.send(data, flags=zmq.NOBLOCK) + try: + self.socket.send(data, flags=zmq.NOBLOCK) + except zmq.Again: + if retry > 0: + time.sleep(wait_time) + self.put_noblock(obj, retry=retry - 1, wait_time=wait_time) + else: + logger.error(f"Failed to send object: {obj}") async def put_async(self, obj: Any): self.setup_lazily() diff --git a/tensorrt_llm/executor/proxy.py b/tensorrt_llm/executor/proxy.py index 78a0d076200..e5226e71523 100644 --- a/tensorrt_llm/executor/proxy.py +++ b/tensorrt_llm/executor/proxy.py @@ -349,7 +349,7 @@ def pre_shutdown(self): # notify the workers to quit if all(not f.done() for f in self.mpi_futures): - self.request_queue.put_noblock(None) + self.request_queue.put_noblock(None, retry=4) def shutdown(self): if not self.workers_started: diff --git a/tensorrt_llm/llmapi/mpi_session.py b/tensorrt_llm/llmapi/mpi_session.py index 7a25fa57f3d..f361b977b7d 100644 --- a/tensorrt_llm/llmapi/mpi_session.py +++ b/tensorrt_llm/llmapi/mpi_session.py @@ -435,7 +435,7 @@ def mpi_future_callback(self, future): f"RemoteMpiCommSessionServer received all results, sending to client\n", "green") try: - self.queue.put_noblock(self.results) + self.queue.put_noblock(self.results, retry=2) except zmq.ZMQError as e: # The client could be shutdown first. if e.errno == zmq.EAGAIN: diff --git a/tests/integration/test_lists/test-db/l0_a10.yml b/tests/integration/test_lists/test-db/l0_a10.yml index 891649e5b9f..2a5e590dd9e 100644 --- a/tests/integration/test_lists/test-db/l0_a10.yml +++ b/tests/integration/test_lists/test-db/l0_a10.yml @@ -34,6 +34,13 @@ l0_a10: - test_e2e.py::test_openai_chat_example[pytorch] TIMEOUT (90) - test_e2e.py::test_trtllm_bench_request_rate_and_concurrency[enable_concurrency-] - test_e2e.py::test_trtllm_bench_invalid_token_pytorch[TinyLlama-1.1B-Chat-v1.0-TinyLlama-1.1B-Chat-v1.0] + # llmapi + - unittest/llmapi/test_llm_utils.py + - unittest/llmapi/test_gc_utils.py + - unittest/llmapi/test_reasoning_parser.py + - unittest/llmapi/test_serialization.py + - unittest/llmapi/test_utils.py + - unittest/llmapi/test_llm_args.py - condition: ranges: system_gpu_count: @@ -98,12 +105,6 @@ l0_a10: - unittest/bindings - unittest/test_model_runner_cpp.py - unittest/llmapi/test_build_cache.py - - unittest/llmapi/test_llm_utils.py - - unittest/llmapi/test_gc_utils.py - - unittest/llmapi/test_reasoning_parser.py - - unittest/llmapi/test_serialization.py - - unittest/llmapi/test_utils.py - - unittest/llmapi/test_llm_args.py - accuracy/test_cli_flow.py::TestGpt2::test_auto_dtype # 1 min - accuracy/test_cli_flow.py::TestGpt2::test_beam_search # 1 min - accuracy/test_cli_flow.py::TestGpt2::test_beam_search_large # 6 mins