Skip to content

Weaken num_new_engines into has_new_engines#938

Open
fzyzcjy wants to merge 4 commits intorollout_ft/20from
rollout_ft/21
Open

Weaken num_new_engines into has_new_engines#938
fzyzcjy wants to merge 4 commits intorollout_ft/20from
rollout_ft/21

Conversation

@fzyzcjy
Copy link
Copy Markdown
Collaborator

@fzyzcjy fzyzcjy commented Apr 7, 2026

to prepare for multi start() in one step

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the rollout engine tracking logic by replacing the integer-based num_new_engines with a boolean has_new_engines across the FSDP, Megatron, and Ray rollout modules. The start_engines method was also modified to return a tuple containing initialization handles and the count of new engines. Review feedback identifies several instances where docstrings, comments, and assertion messages were not updated to reflect these variable name and return type changes.

logger.info(f"Recovered {self.num_new_engines} dead rollout engines (worker_type={self.worker_type})")
assert self.num_new_engines == len(dead_indices), "num_new_engines does not match dead_indices length"
logger.info(f"Recovered {curr_num_new_engines} dead rollout engines (worker_type={self.worker_type})")
assert curr_num_new_engines == len(dead_indices), "curr_num_new_engines does not match dead_indices length"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion message still refers to num_new_engines. It should be updated to match the new variable name curr_num_new_engines for consistency.

Suggested change
assert curr_num_new_engines == len(dead_indices), "curr_num_new_engines does not match dead_indices length"
assert curr_num_new_engines == len(dead_indices), "curr_num_new_engines does not match dead_indices length"

Comment on lines +198 to +199
def clear_updatable_has_new_engines(self):
# when fault tolerance is not enabled, we need to manually clear has_new_engines after update_weights
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment on line 199 still refers to num_new_engines. It should be updated to has_new_engines to reflect the recent changes.

Suggested change
def clear_updatable_has_new_engines(self):
# when fault tolerance is not enabled, we need to manually clear has_new_engines after update_weights
def clear_updatable_has_new_engines(self):
# when fault tolerance is not enabled, we need to manually clear has_new_engines after update_weights

Comment on lines +57 to 62
def start_engines(self, port_cursors: PortCursors) -> tuple[list, int]:
"""Create Ray actors, allocate ports, and fire ``engine.init()`` without waiting.

Returns ``(init_handles, port_cursors)`` where *init_handles* is a list
Returns ``(init_handles, curr_num_new_engines)`` where *init_handles* is a list
of Ray ObjectRefs and *port_cursors* maps node index -> next free port.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for start_engines is outdated. It incorrectly states that the method returns (init_handles, port_cursors), whereas it now returns (init_handles, curr_num_new_engines). Additionally, port_cursors is modified in-place rather than returned.

Suggested change
def start_engines(self, port_cursors: PortCursors) -> tuple[list, int]:
"""Create Ray actors, allocate ports, and fire ``engine.init()`` without waiting.
Returns ``(init_handles, port_cursors)`` where *init_handles* is a list
Returns ``(init_handles, curr_num_new_engines)`` where *init_handles* is a list
of Ray ObjectRefs and *port_cursors* maps node index -> next free port.
"""
def start_engines(self, port_cursors: PortCursors) -> tuple[list, int]:
"""Create Ray actors, allocate ports, and fire engine.init() without waiting.
Returns (init_handles, curr_num_new_engines) where init_handles is a list
of Ray ObjectRefs and curr_num_new_engines is the number of newly started engines.
"""

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant