From cfda12c4db18e82bf055bea4523a90714741ff50 Mon Sep 17 00:00:00 2001 From: Ayush Gupta Date: Tue, 9 Jun 2026 15:48:35 +0530 Subject: [PATCH 1/3] fix(backend): fix RetryRun concurrency conflict handling. Fixes #13507 Signed-off-by: Ayush Gupta --- .../apiserver/resource/resource_manager.go | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index c69a52e1dc1..93ace9e3a0d 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1127,23 +1127,46 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { // First try to update workflow // If fail to get the workflow, return error. - latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{}) - if updateError == nil { - // Update the workflow's resource version to latest. - newExecSpec.SetVersion(latestWorkflow.Version()) - _, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{}) - } - if updateError != nil { - // Remove resource version - newExecSpec.SetVersion("") - newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{}) - if createError != nil { - if createError, ok := createError.(net.Error); ok && createError.Timeout() { - return util.NewUnavailableServerError(createError, "Failed to retry run %s due to error creating and updating a workflow - try again later. Update error: %s", runId, updateError.Error()) + maxRetries := 10 + var finalErr error + for i := 0; i < maxRetries; i++ { + if ctx.Err() != nil { + return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId) + } + + latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{}) + if updateError == nil { + // Update the workflow's resource version to latest. + newExecSpec.SetVersion(latestWorkflow.Version()) + _, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{}) + } + if updateError != nil { + if apierrors.IsConflict(errors.Unwrap(updateError)) || apierrors.IsConflict(updateError) { + finalErr = updateError + time.Sleep(100 * time.Millisecond) + continue } - return util.NewInternalServerError(createError, "Failed to retry run %s due to error updating and creating a workflow. Update error: %s", runId, updateError.Error()) + // Remove resource version + newExecSpec.SetVersion("") + newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{}) + if createError != nil { + if apierrors.IsAlreadyExists(errors.Unwrap(createError)) || apierrors.IsAlreadyExists(createError) { + finalErr = createError + time.Sleep(100 * time.Millisecond) + continue + } + if createError, ok := createError.(net.Error); ok && createError.Timeout() { + return util.NewUnavailableServerError(createError, "Failed to retry run %s due to error creating and updating a workflow - try again later. Update error: %s", runId, updateError.Error()) + } + return util.NewInternalServerError(createError, "Failed to retry run %s due to error updating and creating a workflow. Update error: %s", runId, updateError.Error()) + } + newExecSpec = newCreatedWorkflow } - newExecSpec = newCreatedWorkflow + finalErr = nil + break + } + if finalErr != nil { + return util.NewInternalServerError(finalErr, "Failed to retry run %s due to exhausted retries", runId) } // Notify plugins of retry if run.PluginsOutputString != nil && *run.PluginsOutputString != "" { From 883565b5ab87a6c76cf22350dd2ac794ce8cb20e Mon Sep 17 00:00:00 2001 From: Ayush Gupta Date: Tue, 16 Jun 2026 11:00:42 +0530 Subject: [PATCH 2/3] fix(backend): verify workflow state on conflict refetch for retry and terminate Signed-off-by: Ayush Gupta --- backend/src/apiserver/resource/resource_manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 93ace9e3a0d..bf838b446ee 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1041,6 +1041,12 @@ func (r *ResourceManager) ListJobs(filterContext *model.FilterContext, opts *lis // Terminates a workflow by setting its activeDeadlineSeconds to 0. func TerminateWorkflow(ctx context.Context, wfClient util.ExecutionInterface, name string) error { + // First check if workflow is already in final state to prevent unnecessary patches or racing. + latestWorkflow, err := wfClient.Get(ctx, name, v1.GetOptions{}) + if err == nil && latestWorkflow.ExecutionStatus() != nil && latestWorkflow.ExecutionStatus().IsInFinalState() { + return nil + } + patchObj := util.GetTerminatePatch(util.CurrentExecutionType()) patch, err := json.Marshal(patchObj) if err != nil { @@ -1136,6 +1142,9 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{}) if updateError == nil { + if err := latestWorkflow.CanRetry(); err != nil { + return util.NewInternalServerError(err, "Failed to retry run %s as the refetched workflow does not allow retries", runId) + } // Update the workflow's resource version to latest. newExecSpec.SetVersion(latestWorkflow.Version()) _, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{}) From bea18a20e7a4d391d14cb4fc80e3c71a304de4d9 Mon Sep 17 00:00:00 2001 From: Ayush Gupta Date: Tue, 16 Jun 2026 11:10:16 +0530 Subject: [PATCH 3/3] fix(backend): use exponential backoff and select on ctx.Done() in RetryRun Signed-off-by: Ayush Gupta --- .../apiserver/resource/resource_manager.go | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index bf838b446ee..5604b92019d 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1131,10 +1131,15 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { return util.NewInternalServerError(err, "Failed to retry run %s due to error cleaning up the failed pods from the previous attempt", runId) } - // First try to update workflow - // If fail to get the workflow, return error. + // First try to get the workflow and update it. + // If we fail to get the workflow (e.g., NotFound), we fall back to creating it. maxRetries := 10 var finalErr error + b := backoff.NewExponentialBackOff() + b.InitialInterval = 100 * time.Millisecond + b.MaxInterval = 2 * time.Second + b.Reset() + for i := 0; i < maxRetries; i++ { if ctx.Err() != nil { return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId) @@ -1152,7 +1157,14 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { if updateError != nil { if apierrors.IsConflict(errors.Unwrap(updateError)) || apierrors.IsConflict(updateError) { finalErr = updateError - time.Sleep(100 * time.Millisecond) + delay := b.NextBackOff() + timer := time.NewTimer(delay) + select { + case <-timer.C: + case <-ctx.Done(): + timer.Stop() + return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId) + } continue } // Remove resource version @@ -1161,7 +1173,14 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error { if createError != nil { if apierrors.IsAlreadyExists(errors.Unwrap(createError)) || apierrors.IsAlreadyExists(createError) { finalErr = createError - time.Sleep(100 * time.Millisecond) + delay := b.NextBackOff() + timer := time.NewTimer(delay) + select { + case <-timer.C: + case <-ctx.Done(): + timer.Stop() + return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId) + } continue } if createError, ok := createError.(net.Error); ok && createError.Timeout() {