Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions tensorrt_llm/executor/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,36 @@ def put(self, obj: Any):
# Send data without HMAC
self.socket.send_pyobj(obj)

def put_noblock(self, obj: Any):
def put_noblock(self,
obj: Any,
*,
retry: int = 1,
wait_time: float = 0.001):
'''
Put an object into the queue without blocking, and retry if the send fails.
NOTE: It won't raise any error if the send fails.

Parameters:
obj (Any): The object to send.
retry (int): The number of times to retry sending the object.
wait_time (float): The time to wait before retrying.
'''

assert retry >= 0 and retry <= 10, "Retry must be between 0 and 10, adjust the wait_time if needed"

self.setup_lazily()
with nvtx_range_debug("send", color="blue", category="IPC"):
data = pickle.dumps(obj) # nosec B301
if self.use_hmac_encryption:
data = self._sign_data(data)
self.socket.send(data, flags=zmq.NOBLOCK)
try:
self.socket.send(data, flags=zmq.NOBLOCK)
except zmq.Again:
if retry > 0:
time.sleep(wait_time)
self.put_noblock(obj, retry=retry - 1, wait_time=wait_time)
else:
logger.error(f"Failed to send object: {obj}")

async def put_async(self, obj: Any):
self.setup_lazily()
Expand Down
2 changes: 1 addition & 1 deletion tensorrt_llm/executor/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def pre_shutdown(self):

# notify the workers to quit
if all(not f.done() for f in self.mpi_futures):
self.request_queue.put_noblock(None)
self.request_queue.put_noblock(None, retry=4)

def shutdown(self):
if not self.workers_started:
Expand Down
2 changes: 1 addition & 1 deletion tensorrt_llm/llmapi/mpi_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def mpi_future_callback(self, future):
f"RemoteMpiCommSessionServer received all results, sending to client\n",
"green")
try:
self.queue.put_noblock(self.results)
self.queue.put_noblock(self.results, retry=2)
except zmq.ZMQError as e:
# The client could be shutdown first.
if e.errno == zmq.EAGAIN:
Expand Down
13 changes: 7 additions & 6 deletions tests/integration/test_lists/test-db/l0_a10.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ l0_a10:
- test_e2e.py::test_openai_chat_example[pytorch] TIMEOUT (90)
- test_e2e.py::test_trtllm_bench_request_rate_and_concurrency[enable_concurrency-]
- test_e2e.py::test_trtllm_bench_invalid_token_pytorch[TinyLlama-1.1B-Chat-v1.0-TinyLlama-1.1B-Chat-v1.0]
# llmapi
- unittest/llmapi/test_llm_utils.py
- unittest/llmapi/test_gc_utils.py
- unittest/llmapi/test_reasoning_parser.py
- unittest/llmapi/test_serialization.py
- unittest/llmapi/test_utils.py
- unittest/llmapi/test_llm_args.py
- condition:
ranges:
system_gpu_count:
Expand Down Expand Up @@ -98,12 +105,6 @@ l0_a10:
- unittest/bindings
- unittest/test_model_runner_cpp.py
- unittest/llmapi/test_build_cache.py
- unittest/llmapi/test_llm_utils.py
- unittest/llmapi/test_gc_utils.py
- unittest/llmapi/test_reasoning_parser.py
- unittest/llmapi/test_serialization.py
- unittest/llmapi/test_utils.py
- unittest/llmapi/test_llm_args.py
- accuracy/test_cli_flow.py::TestGpt2::test_auto_dtype # 1 min
- accuracy/test_cli_flow.py::TestGpt2::test_beam_search # 1 min
- accuracy/test_cli_flow.py::TestGpt2::test_beam_search_large # 6 mins
Expand Down