-
Notifications
You must be signed in to change notification settings - Fork 2k
Suggest a worker based flow cancellation strategy #19938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
desertaxle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a solid plan! I left some comments and questions that I had while reading through.
It would be nice to have a section of potential/rejected alternatives. I think having 1 or 2 other implementations to compare against helps to give us confidence that we're choosing the correct approach.
|
|
||
| 2. **WebSocket Events**: Subscribe to `prefect.flow-run.Cancelling` events for real-time detection of new cancellations. | ||
|
|
||
| No continuous polling is needed—the startup poll handles the restart case, and WebSocket events handle the steady-state case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Runner has a fallback polling mechanism for cancellation if there are issues with the websocket. We might want something similar for the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering that. I had questions around the load that polling would put on the server. Maybe this is a non-issue or we could poll at a slower cadence since this is a fallback to the fallback
|
|
||
| 1. Parse `infrastructure_pid` to verify it's infrastructure this worker can manage | ||
| 2. Calculate time remaining until grace period expires based on `state.timestamp` | ||
| 3. Schedule an async task to check again after the remaining time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the grace period has already passed we can kill the infrastructure immedidate, right?
Also, how should the worker handle a requested shut down when there are scheduled cancellation tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I was thinking. I think the async task here is just to decouple the shutdown task from the event. The propose code looks like the async task will sleep for n which in the event that it is passed, that would be 0.
As for scheduled cancellation task, I would think it should wait for those. The reason for the wait is too allow for cancellation_hooks. We could cancel immediately if flow does not have cancellation_hooks. This is something to consider when we implement this
| 1. **Startup Poll**: On worker startup, query the API for any flow runs in `CANCELLING` state belonging to this work pool. This catches orphaned flow runs from before the worker started (e.g., after a worker restart). | ||
|
|
||
| 2. **WebSocket Events**: Subscribe to `prefect.flow-run.Cancelling` events for real-time detection of new cancellations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be good to specify the scope of the polling/subscription. Is it for all flow runs in the work pool? Only the flow runs from the work queues the worker is polling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much context for how the worker currently polls for work, but I would think we would only want cancellation events from any source where the worker would initiate work. So probably work queues if we can filter down to that without server side changes to start.
My naive thought was that there are likely fewer cancellation events, so if we had to do client side filtering that seems like a reasonable trade off.
My primary concern is adding load to the server
| - Set a longer grace period to give the Runner more time | ||
| - Set grace period to `-1` to disable worker-side cancellation entirely (accepting that stuck flows may remain in `CANCELLING` indefinitely) | ||
|
|
||
| ### Duplicate Cancellation Attempts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any consideration needed for scenarios where more than one worker is running for a work pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case we would just duplicate the work. I think to have a leasing system would be ideal, but to keep this all client side I don't see harm in multiple workers trying to delete the same thing. We will just need to be diligent to ensure that deleting something that is already deleted is still marked as a successful cancellation
| 2. Calculate time remaining until grace period expires based on `state.timestamp` | ||
| 3. Schedule an async task to check again after the remaining time | ||
| 4. When the task fires, re-fetch the flow run state | ||
| 5. If still `CANCELLING`, call `kill_infrastructure()` and mark as `CANCELLED` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should we handle scenarios where kill_infrastructure fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would not be a new failure case. This problem currently exists for runner cancellations, so I think the current error handling would still be relevant. We may need to mirror it into the worker code. If the the worker calling this gets an exception, we should raise the exception and log it as an error while keeping the flow in cancelling state
| | `PREFECT_WORKER_ENABLE_CANCELLATION` | `false` | Feature flag to enable worker-side cancellation | | ||
| | `PREFECT_WORKER_CANCELLATION_GRACE_PERIOD_SECONDS` | `60` | Seconds to wait before force-cancelling. Set to `-1` to disable | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users can disable worker cancellation by providing -1 to PREFECT_WORKER_CANCELLATION_GRACE_PERIOD_SECONDS, then maybe we don't need PREFECT_WORKER_ENABLE_CANCELLATION.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if claude fully understood my intent here. I don't think we would never set -1 as an env var. My thought was that eventually we could make server side changes to allow for a "cancellation_grace_period" for a given flow. If that flow has complex cancellation_hooks that naturally take longer that the sensible cancellation grace_period, those hooks would not complete. The user could then set an extended grace period, or disable any worker force kill with the understanding that their flows could get stuck in cancelling. A user could also set a grace period to 0 to immediately cancel that flow
| This works well when the Runner is healthy and responsive. However, cancellation fails when: | ||
|
|
||
| - The flow process is stuck or hung (infinite loop, deadlock, blocking I/O) | ||
| - The Runner crashed before receiving the cancellation signal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling this case might be out of scope for this, since in cases like this the flow run should be marked as crashed by the worker. If we're seeing issues here, then improving the crash detection functionality of the workers is the way to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I think this is just claude digging deep to show off. I don't think this is a real case. I should have looked at those cases a little closer and cleaned them up
|
|
||
| This works well when the Runner is healthy and responsive. However, cancellation fails when: | ||
|
|
||
| - The flow process is stuck or hung (infinite loop, deadlock, blocking I/O) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the flow process is stuck, the Runner process is the one that handles cancellation by killing the flow process, so I'm not sure this is a valid failure scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this is just claude being a know it all... I don't think all these cases are real
Checklist
<link to issue>"mint.json.