Fix/workflow state on conflict 2#13531
Conversation
…low#13507 Signed-off-by: Ayush Gupta <kathilshiva@gmail.com>
… terminate Signed-off-by: Ayush Gupta <kathilshiva@gmail.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @Ayush-kathil. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Improves workflow termination and run retry behavior by avoiding unnecessary terminate patches and adding retry logic around Kubernetes workflow update/create conflicts.
Changes:
- Skip terminate patching when a workflow is already in a final state.
- Add bounded retry loop for workflow update/create during
RetryRun, handling conflict and already-exists errors.
| if apierrors.IsConflict(errors.Unwrap(updateError)) || apierrors.IsConflict(updateError) { | ||
| finalErr = updateError | ||
| time.Sleep(100 * time.Millisecond) | ||
| continue |
| if apierrors.IsAlreadyExists(errors.Unwrap(createError)) || apierrors.IsAlreadyExists(createError) { | ||
| finalErr = createError | ||
| time.Sleep(100 * time.Millisecond) | ||
| continue |
| // First try to update workflow | ||
| // If fail to get the workflow, return error. |
| return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId) | ||
| } | ||
|
|
||
| latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{}) |
| } | ||
| newExecSpec = newCreatedWorkflow | ||
| if updateError != nil { | ||
| if apierrors.IsConflict(errors.Unwrap(updateError)) || apierrors.IsConflict(updateError) { |
| newExecSpec.SetVersion("") | ||
| newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{}) | ||
| if createError != nil { | ||
| if apierrors.IsAlreadyExists(errors.Unwrap(createError)) || apierrors.IsAlreadyExists(createError) { |
…ryRun Signed-off-by: Ayush Gupta <kathilshiva@gmail.com>
What this PR does
This is a follow-up to PR #13508 that adds extra state validation checks during
RetryRunandTerminateRun.When the API server encounters a conflict during a retry or terminate operation, it refetches the latest workflow state and tries again. However, in rare cases (like two manual requests racing each other), the first request might successfully move the workflow into a
Runningor terminal state, while the second request refetches it and blindly applies the operation again.To fix this:
FailedorError) using.CanRetry()before trying to update it again.IsInFinalState()) before patching theactiveDeadlineSeconds.This ensures we don't accidentally retry a running workflow or redundantly terminate an already completed one.
Related Issues
Follow-up to #13508
Fixes #13507