Skip to content

Add queue-based orchestration for three service requests#948

Open
zhtshr wants to merge 8 commits intoModelTC:mainfrom
zhtshr:zht_dev
Open

Add queue-based orchestration for three service requests#948
zhtshr wants to merge 8 commits intoModelTC:mainfrom
zhtshr:zht_dev

Conversation

@zhtshr
Copy link
Contributor

@zhtshr zhtshr commented Mar 16, 2026

This pull request refactors the DataManager class and related threading logic in lightx2v/disagg/conn.py to better support multi-room (multi-session) operation, improve thread management, and enhance code clarity. The changes introduce per-room resource management, refactor thread lifecycle handling, and add a new ReqManager utility class for ZeroMQ-based message passing.

Key changes include:

Multi-room support and resource management:

  • Refactored DataManager to manage per-room resources such as data_args, threads, and events, enabling concurrent handling of multiple rooms/sessions. Initialization and cleanup for each room are now handled through new init and release methods.
  • Updated data transfer and synchronization methods to use per-room arguments and events, ensuring correct operation in multi-room scenarios. [1] [2] [3] [4] [5]

Thread lifecycle and event handling improvements:

  • Replaced global thread/event management with per-room thread pools and stop events. Added helper methods to start, register, and end threads for each room, allowing for safe cleanup and improved robustness. [1] [2] [3] [4] [5] [6] [7]

API and naming consistency:

  • Updated method signatures and variable names for clarity and consistency, such as using sender_data_ptrs and receiver_ptrs instead of ambiguous names. [1] [2]
  • Improved error handling and input validation, for example by requiring bootstrap_room in DataReceiver.

New utility class:

  • Added ReqManager, a utility class for sending and receiving Python objects over ZeroMQ, including support for non-blocking receives and automatic conversion of nested mappings to built-in types.

Minor improvements:

  • Three services utilize queues to maintain request states.
  • Dependencies between requests exist solely during two message communication phases, while message transmission is handled asynchronously.
  • All operations within the orange blocks support batch processing.

These changes collectively improve the scalability, maintainability, and clarity of the data disaggregation infrastructure.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the lightx2v data disaggregation infrastructure by introducing robust multi-room/multi-session capabilities and a queue-based request orchestration system. The core DataManager class has been refactored to manage resources and threads on a per-room basis, improving scalability and isolation. A new ReqManager facilitates inter-service communication via ZeroMQ, enabling asynchronous message passing. These changes collectively streamline the handling of multiple concurrent requests, improve thread safety, and lay the groundwork for more complex distributed processing workflows.

Highlights

  • Multi-room/Multi-session Support: The DataManager class has been extensively refactored to manage resources, data arguments, threads, and events on a per-room basis, enabling concurrent handling of multiple independent sessions.
  • Improved Thread Management: Global thread and event management has been replaced with per-room thread pools and stop events, allowing for safer cleanup and increased robustness in multi-threaded environments.
  • New Request Orchestration System: A queue-based request orchestration model has been implemented across the Encoder, Transformer, and Decoder services, facilitated by a new ControllerService and ReqManager.
  • ZeroMQ-based Message Passing: A new ReqManager utility class was introduced to handle sending and receiving Python objects over ZeroMQ, including support for non-blocking operations and automatic type conversion.
  • API and Naming Consistency: Method signatures and variable names were updated for improved clarity and consistency, such as using sender_data_ptrs and receiver_ptrs.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • lightx2v/disagg/conn.py
    • Added Mapping to imports.
    • Added REQUEST_POLLING_PORT constant.
    • Refactored DataManager constructor to remove args parameter and initialize per-room data structures.
    • Introduced init and release methods to DataManager for per-room resource management and cleanup.
    • Modified register_buffer_to_engine to accept a room parameter.
    • Added prepare_room_threads, register_room_thread, and end_room_threads methods for managing thread lifecycles per room.
    • Updated send_data method signature to include room and renamed parameters for clarity.
    • Modified sync_status_to_transformer_endpoint to use room-specific receiver_engine_rank.
    • Refactored start_phase1_encode_thread, start_phase1_transformer_thread, start_phase2_transformer_thread, and start_phase2_decode_thread to accept a room parameter, use room-specific events, and register threads.
    • Added end_phase1_encode_thread, end_phase1_transformer_thread, end_phase2_transformer_thread, and end_phase2_decode_thread for thread cleanup.
    • Modified thread loops to check stop_event.is_set() and handle zmq.Again exceptions for non-blocking receives.
    • Updated enqueue_request to use room-specific transfer_events.
    • Modified DataReceiver constructor to require bootstrap_room and use room-specific data_args for server URLs.
    • Introduced ReqManager class for ZeroMQ-based object sending and non-blocking receiving.
  • lightx2v/disagg/examples/wan_i2v_service.py
    • Imported ControllerService.
    • Added encoder_stop_event, transformer_stop_event, decoder_stop_event for graceful service shutdown.
    • Modified run_encoder, run_transformer, run_decoder functions to initialize services without config and call exec_request with stop_event.
    • Removed release_memory calls from service run functions.
    • Added run_controller function to initialize ControllerService, add requests, and set stop events for other services.
    • Integrated controller_thread into the main execution flow.
  • lightx2v/disagg/examples/wan_t2v_service.py
    • Imported ControllerService.
    • Added encoder_stop_event, transformer_stop_event, decoder_stop_event for graceful service shutdown.
    • Modified service run functions to initialize services without config and call exec_request with stop_event.
    • Removed release_memory calls from service run functions.
    • Added run_controller function to initialize ControllerService, add requests, and set stop events for other services.
    • Integrated controller_thread into the main execution flow.
  • lightx2v/disagg/mooncake.py
    • Updated the default MOONCAKE_CONFIG_PATH in MooncakeTransferEngineConfig.load_from_env.
  • lightx2v/disagg/protocol.py
    • Changed the type of bootstrap_room in AllocationRequest from str to int.
  • lightx2v/disagg/services/base.py
    • Simplified BaseService constructor by removing the config parameter and abstract methods.
  • lightx2v/disagg/services/controller.py
    • Added a new ControllerService to manage and dispatch requests to other services using ZeroMQ.
  • lightx2v/disagg/services/decoder.py
    • Imported deque, Dict, REQUEST_POLLING_PORT, ReqManager.
    • Refactored DecoderService constructor to initialize ReqManager, data_mgr, and data_receiver as dictionaries for multi-room support.
    • Introduced init method to handle per-request configuration and DataManager initialization.
    • Modified alloc_memory to use room-specific _rdma_buffers.
    • Updated process method to accept config and retrieve room-specific buffers and receiver.
    • Removed direct polling loop from process and introduced exec_request for queue-based request processing.
    • exec_request manages req_queue, waiting_queue, and exec_queue for asynchronous request handling, including error handling and graceful shutdown.
    • Modified release_memory and added release method for room-specific resource cleanup.
  • lightx2v/disagg/services/encoder.py
    • Imported deque, Dict, REQUEST_POLLING_PORT, ReqManager.
    • Refactored EncoderService constructor, introduced init method, modified alloc_memory and process for multi-room support.
    • Added exec_request for queue-based request processing, managing req_queue, exec_queue, and complete_queue.
    • Modified release_memory and added release method for room-specific resource cleanup.
  • lightx2v/disagg/services/transformer.py
    • Imported deque, REQUEST_POLLING_PORT, ReqManager.
    • Refactored TransformerService constructor, introduced init method, modified alloc_memory and process for multi-room support.
    • Added exec_request for queue-based request processing, managing req_queue, waiting_queue, exec_queue, and complete_queue.
    • Modified release_memory and added release method for room-specific resource cleanup.
Activity
  • No specific activity (comments, reviews, progress updates) has been recorded for this pull request since its creation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the data management and service architecture to support multi-tenant processing, introducing a 'room' concept for resource isolation and lifecycle management. Key changes include updating the DataManager to handle room-specific data arguments, threads, and events, and adding init and release methods for room lifecycle. A new ReqManager class is introduced for ZeroMQ-based request communication. The EncoderService, TransformerService, and DecoderService are modified to be stateless, receiving configuration per request, and now include exec_request methods for processing requests in a loop, managing their own DataSender/DataReceiver instances per room. A ControllerService is added to dispatch requests to these services. Review comments highlight a critical issue with using time.sleep for synchronization in the controller, a high-severity portability issue with a hardcoded user-specific path in mooncake.py, a medium-severity resource leak potential due to unhandled thread join timeouts, and a medium-severity code duplication issue in ReqManager's socket handling.

self.req_mgr.send(bootstrap_addr, REQUEST_POLLING_PORT + 2, config)
self.logger.info("Request added to controller queue and dispatched to services")

time.sleep(10) # Sleep briefly to allow services to process the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using time.sleep(10) to wait for services to process requests is a brittle and unreliable synchronization mechanism. This can lead to race conditions and makes the system's behavior dependent on timing. A more robust solution would involve a proper synchronization mechanism, such as having services send a completion signal back to the controller. For an example script, this might be acceptable as a temporary solution, but it's a critical flaw for any production-level code.

@staticmethod
def load_from_env() -> "MooncakeTransferEngineConfig":
config_file_path = os.getenv("MOONCAKE_CONFIG_PATH", "/data/nvme1/yongyang/FL/LightX2V/configs/mooncake_config.json")
config_file_path = os.getenv("MOONCAKE_CONFIG_PATH", "/root/zht/LightX2V/configs/mooncake_config.json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The default path for MOONCAKE_CONFIG_PATH is user-specific (/root/zht/...). Hardcoding user-specific paths makes the code less portable and will likely cause it to fail for other developers or in different environments. It's better to remove the default value and rely on the environment variable being set, as the code already checks for None and raises an error.

Suggested change
config_file_path = os.getenv("MOONCAKE_CONFIG_PATH", "/root/zht/LightX2V/configs/mooncake_config.json")
config_file_path = os.getenv("MOONCAKE_CONFIG_PATH")

Comment on lines +185 to +187
for t in threads:
if t.is_alive():
t.join(timeout=1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The join call has a timeout of 1 second, but there's no handling for the case where a thread doesn't terminate within this time. This could leave zombie threads running and lead to resource leaks. It would be more robust to check if the thread is still alive after the join and log a warning.

Suggested change
for t in threads:
if t.is_alive():
t.join(timeout=1.0)
for t in threads:
if t.is_alive():
t.join(timeout=1.0)
if t.is_alive():
logger.warning("Thread %s for room %s did not terminate in time.", t.name, room)

Comment on lines +573 to +590
def receive(self, port: int):
socket = self.pull_sockets.get(port)
if socket is None:
socket = self.context.socket(zmq.PULL)
socket.bind(f"tcp://*:{port}")
self.pull_sockets[port] = socket
return socket.recv_pyobj()

def receive_non_block(self, port: int):
socket = self.pull_sockets.get(port)
if socket is None:
socket = self.context.socket(zmq.PULL)
socket.bind(f"tcp://*:{port}")
self.pull_sockets[port] = socket
try:
return socket.recv_pyobj(flags=zmq.NOBLOCK)
except zmq.Again:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's duplicated code for creating and caching pull sockets in receive and receive_non_block. This can be refactored into a private helper method to improve maintainability and reduce redundancy.

Suggested change
def receive(self, port: int):
socket = self.pull_sockets.get(port)
if socket is None:
socket = self.context.socket(zmq.PULL)
socket.bind(f"tcp://*:{port}")
self.pull_sockets[port] = socket
return socket.recv_pyobj()
def receive_non_block(self, port: int):
socket = self.pull_sockets.get(port)
if socket is None:
socket = self.context.socket(zmq.PULL)
socket.bind(f"tcp://*:{port}")
self.pull_sockets[port] = socket
try:
return socket.recv_pyobj(flags=zmq.NOBLOCK)
except zmq.Again:
return None
def _get_pull_socket(self, port: int) -> zmq.Socket:
socket = self.pull_sockets.get(port)
if socket is None:
socket = self.context.socket(zmq.PULL)
socket.bind(f"tcp://*:{port}")
self.pull_sockets[port] = socket
return socket
def receive(self, port: int):
socket = self._get_pull_socket(port)
return socket.recv_pyobj()
def receive_non_block(self, port: int):
socket = self._get_pull_socket(port)
try:
return socket.recv_pyobj(flags=zmq.NOBLOCK)
except zmq.Again:
return None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant