Skip to content

Conversation

AdilZouitine
Copy link
Member

Address this comment #644 (comment), it's critical, it needs careful review.

cc @helper2424 @michel-aractingi

SECTION TO REMOVE BEFORE SUBMITTING YOUR PR

Note: Anyone in the community is free to review the PR once the tests have passed. Feel free to tag
members/contributors who may be interested in your PR. Try to avoid tagging more than 3 people.

Note: Before submitting this PR, please read the contributor guideline.

Copy link
Collaborator

@aliberts aliberts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you for taking care of this!

Comment on lines 26 to 30
The class exposes a *shutdown_event* attribute that is set when a shutdown
signal is received. A counter tracks how many shutdown signals have been
caught. On the second signal the process exits with status 1, mirroring the
previous behaviour of *setup_process_handlers* but without relying on a
module-level mutable counter.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool!

@helper2424
Copy link
Contributor

Looks great. 🚀

Copy link
Collaborator

@imstevenpmwork imstevenpmwork left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments


return shutdown_event
def _register_handlers(self): # noqa: D401 – not a docstring oriented helper
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this noqa still needed?

@@ -546,7 +547,7 @@ def send_transitions(

# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
setup_process_handlers(False)
_ = ProcessSignalHandler(False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add explicitly the argument use_threads=False (of course, if this is still needed, given the comment above)

# On a second Ctrl-C (or any supported signal) *force* the exit to
# mimic the previous behaviour while giving the caller one chance to
# shutdown gracefully.
if self._counter > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any strong reason why you need a second signal to shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem that scripts sometimes stuck and doesn't stop. Or do that very slow - like minutes. We want to kill process immediately if user send ctrl+c many times.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then why ignore the first ctlr+c?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't ignore.

The flow is following - you press ctrl+c one time - than the shutdown event is set and the the process stops gracefully. It write logs, stop servers, closing connections and etc. It how it should work. it'show it works now with Actor Learner in 90% of cases. Also, it how we want it to work. The default flow

But sometimes something happen, maybe some bugs, or some issue in pytorch, or issue in multiprocesses setup and some of processes or threads are stuck. Like for minutes, or foreverer. In such case you will be seating and watching locked terminal. So, as every user u will try to press ctrl+c several times. We will recognize it and kill the process hardly.

Without any signal handling - we won't be able to stop the process (processes) at all, without this trick we won't be able to stop processes in case of some bugs or breaking changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smelly IMHO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imstevenpmwork what is the best from your opinion to solve it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the reason you guys need to ignore the first CTRL+C (but set the shutdown event) is to ensure synchronization between processes. If Process A terminates immediately, other processes (B and C) won't be able to check the shutdown_event status reliably, leading to undefined behavior.

  1. Process A: Registers signals and creates the shutdown event
  2. Processes B & C: Continuously monitor shutdown_event.is_set()

If Process A dies during the first signal, Processes B and C can't check its status. The delayed termination you observe likely occurs because the system relies on garbage collection to clean up these processes.

I think, for example, that the approach below would have the same effect as ignoring the first signal:

self.shutdown_event.set()
time.sleep(10)  # Give Processes B & C time to detect the event and shutdown cleanly
sys.exit(1) # Process B & C had the time to respond to the shutdown signal before Process A terminates

However, in my opinion, this would not be a good solution either. The best way to proceed is to just have a rigorous thread/process synchronization communication which is not straight forward

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. This could be good too, yes.

Just to give the context how we came up to the current solution:
1 - We started just with shutdown_event.set() in the beginning which was perfectly worked in multithreading version
2 - After this we added multiprocessing version (here is two options - fork, spawning). And at some point when we had bugs in the flow, or some internal implementation of multiprocessing queues stuck in some cases terminals just were frozen. If u were stiing and pressing ctrl+c infinite amount of time. So we cam up with an idea of catching such cases, and closing harddly all processes.

Theoretically - yeah, we can always do sys.exit(1). But in such case we will never know that exist some issue with stuck processes in the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems that systems will be more complex - as we will add some planners, supervisors, we will add locomotion parts, and probably remote inference as default way to produce actions. So, would be good to have a good basic system.

@@ -139,7 +139,8 @@ def actor_cli(cfg: TrainRLServerPipelineConfig):
init_logging(log_file=log_file, display_pid=display_pid)
logging.info(f"Actor logging initialized, writing to {log_file}")

shutdown_event = setup_process_handlers(use_threads(cfg))
process_handler = ProcessSignalHandler(use_threads(cfg))
shutdown_event = process_handler.shutdown_event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems process_handler is used only once in the next line. Nit:

shutdown_event = ProcessSignalHandler(use_threads(cfg)).shutdown_event

Same for other instances

@@ -673,7 +674,7 @@ def start_learner(
# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
# Return back for MP
setup_process_handlers(False)
_ = ProcessSignalHandler(False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ = ProcessSignalHandler(False) Is this really needed? We don't seem to use shutdown_event in here. It might register the signal permanently to the process tho, so I'm not able to say. Worth testing if you can just get rid of this 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required here. In case of processes stuck it will allow to kill it. Don't remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever u create a new process via fork, or spawn it you need to register all handlers for them one more time. If u don't do it - than handlers not under the control. Also we want to track the shutdown event from the main process to keep the system sync. Before dropping any parts here - would be good to check manually that everything work as expected. We have tons of issues with it before - the scripts stuck forever locking terminals.

Copy link
Collaborator

@imstevenpmwork imstevenpmwork Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this last point. One follow up question, we do like:

def start_learner(
    parameters_queue: Queue,
    transition_queue: Queue,
    interaction_message_queue: Queue,
    shutdown_event: any,  # Event,
    cfg: TrainRLServerPipelineConfig,
):
    """
    ...
    """
    if not use_threads(cfg):
        # Create a process-specific log file
        log_dir = os.path.join(cfg.output_dir, "logs")
        os.makedirs(log_dir, exist_ok=True)
        log_file = os.path.join(log_dir, f"learner_process_{os.getpid()}.log")

        # Initialize logging with explicit log file
        init_logging(log_file=log_file, display_pid=True)
        logging.info("Learner server process logging initialized")

        # Setup process handlers to handle shutdown signal
        # But use shutdown event from the main process
        # Return back for MP
        _ = ProcessSignalHandler(False)

    service = learner_service.LearnerService(
        shutdown_event=shutdown_event,
        parameters_queue=parameters_queue,
        seconds_between_pushes=cfg.policy.actor_learner_config.policy_parameters_push_frequency,
        transition_queue=transition_queue,
        interaction_message_queue=interaction_message_queue,
    )

What's the rationale of passing to learner_service.LearnerService the shutdown_event from the function against passing instead the shutdown_event from _ = ProcessSignalHandler(False)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 88d63e1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We looking for the event from the main process, which is different from events from spawned processes. I strongly suggest to try, because I was doing that during experiments, u will see how it works. Also, for now all services should stop correctly, but u can try to lock one of subprocess and check the behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 88d63e1

this is mistake - please don't remove that part, or lets drop multiprocessing support, because it won't work correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if u have a change to experiemnt with it - the processing in linux/bsd has several modes - forks, spawns. And and it works differently in both of them, for now should be used spawning, but as I said before - the python API could change

@AdilZouitine AdilZouitine merged commit 1bf063a into user/adil-zouitine/2025-1-7-port-hil-serl-new Jun 11, 2025
7 checks passed
@AdilZouitine AdilZouitine deleted the user/azouitine/6-11-2025-port-hil-refacto-process-signal-handler branch June 11, 2025 16:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants