Skip to content

Commit e77ec06

Browse files
authored
[https://nvbugs/5451296][fix] zmq nonblock bug with retry (#7019)
Signed-off-by: Superjomn <[email protected]>
1 parent 5acf213 commit e77ec06

File tree

4 files changed

+34
-10
lines changed

4 files changed

+34
-10
lines changed

tensorrt_llm/executor/ipc.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,36 @@ def put(self, obj: Any):
125125
# Send data without HMAC
126126
self.socket.send_pyobj(obj)
127127

128-
def put_noblock(self, obj: Any):
128+
def put_noblock(self,
129+
obj: Any,
130+
*,
131+
retry: int = 1,
132+
wait_time: float = 0.001):
133+
'''
134+
Put an object into the queue without blocking, and retry if the send fails.
135+
NOTE: It won't raise any error if the send fails.
136+
137+
Parameters:
138+
obj (Any): The object to send.
139+
retry (int): The number of times to retry sending the object.
140+
wait_time (float): The time to wait before retrying.
141+
'''
142+
143+
assert retry >= 0 and retry <= 10, "Retry must be between 0 and 10, adjust the wait_time if needed"
144+
129145
self.setup_lazily()
130146
with nvtx_range_debug("send", color="blue", category="IPC"):
131147
data = pickle.dumps(obj) # nosec B301
132148
if self.use_hmac_encryption:
133149
data = self._sign_data(data)
134-
self.socket.send(data, flags=zmq.NOBLOCK)
150+
try:
151+
self.socket.send(data, flags=zmq.NOBLOCK)
152+
except zmq.Again:
153+
if retry > 0:
154+
time.sleep(wait_time)
155+
self.put_noblock(obj, retry=retry - 1, wait_time=wait_time)
156+
else:
157+
logger.error(f"Failed to send object: {obj}")
135158

136159
async def put_async(self, obj: Any):
137160
self.setup_lazily()

tensorrt_llm/executor/proxy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def pre_shutdown(self):
349349

350350
# notify the workers to quit
351351
if all(not f.done() for f in self.mpi_futures):
352-
self.request_queue.put_noblock(None)
352+
self.request_queue.put_noblock(None, retry=4)
353353

354354
def shutdown(self):
355355
if not self.workers_started:

tensorrt_llm/llmapi/mpi_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ def mpi_future_callback(self, future):
435435
f"RemoteMpiCommSessionServer received all results, sending to client\n",
436436
"green")
437437
try:
438-
self.queue.put_noblock(self.results)
438+
self.queue.put_noblock(self.results, retry=2)
439439
except zmq.ZMQError as e:
440440
# The client could be shutdown first.
441441
if e.errno == zmq.EAGAIN:

tests/integration/test_lists/test-db/l0_a10.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ l0_a10:
3434
- test_e2e.py::test_openai_chat_example[pytorch] TIMEOUT (90)
3535
- test_e2e.py::test_trtllm_bench_request_rate_and_concurrency[enable_concurrency-]
3636
- test_e2e.py::test_trtllm_bench_invalid_token_pytorch[TinyLlama-1.1B-Chat-v1.0-TinyLlama-1.1B-Chat-v1.0]
37+
# llmapi
38+
- unittest/llmapi/test_llm_utils.py
39+
- unittest/llmapi/test_gc_utils.py
40+
- unittest/llmapi/test_reasoning_parser.py
41+
- unittest/llmapi/test_serialization.py
42+
- unittest/llmapi/test_utils.py
43+
- unittest/llmapi/test_llm_args.py
3744
- condition:
3845
ranges:
3946
system_gpu_count:
@@ -98,12 +105,6 @@ l0_a10:
98105
- unittest/bindings
99106
- unittest/test_model_runner_cpp.py
100107
- unittest/llmapi/test_build_cache.py
101-
- unittest/llmapi/test_llm_utils.py
102-
- unittest/llmapi/test_gc_utils.py
103-
- unittest/llmapi/test_reasoning_parser.py
104-
- unittest/llmapi/test_serialization.py
105-
- unittest/llmapi/test_utils.py
106-
- unittest/llmapi/test_llm_args.py
107108
- accuracy/test_cli_flow.py::TestGpt2::test_auto_dtype # 1 min
108109
- accuracy/test_cli_flow.py::TestGpt2::test_beam_search # 1 min
109110
- accuracy/test_cli_flow.py::TestGpt2::test_beam_search_large # 6 mins

0 commit comments

Comments
 (0)