Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,12 @@ func (r *ResourceManager) ListJobs(filterContext *model.FilterContext, opts *lis

// Terminates a workflow by setting its activeDeadlineSeconds to 0.
func TerminateWorkflow(ctx context.Context, wfClient util.ExecutionInterface, name string) error {
// First check if workflow is already in final state to prevent unnecessary patches or racing.
latestWorkflow, err := wfClient.Get(ctx, name, v1.GetOptions{})
if err == nil && latestWorkflow.ExecutionStatus() != nil && latestWorkflow.ExecutionStatus().IsInFinalState() {
return nil
}

patchObj := util.GetTerminatePatch(util.CurrentExecutionType())
patch, err := json.Marshal(patchObj)
if err != nil {
Expand Down Expand Up @@ -1125,25 +1131,70 @@ func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error {
return util.NewInternalServerError(err, "Failed to retry run %s due to error cleaning up the failed pods from the previous attempt", runId)
}

// First try to update workflow
// If fail to get the workflow, return error.
latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{})
if updateError == nil {
// Update the workflow's resource version to latest.
newExecSpec.SetVersion(latestWorkflow.Version())
_, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{})
}
if updateError != nil {
// Remove resource version
newExecSpec.SetVersion("")
newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{})
if createError != nil {
if createError, ok := createError.(net.Error); ok && createError.Timeout() {
return util.NewUnavailableServerError(createError, "Failed to retry run %s due to error creating and updating a workflow - try again later. Update error: %s", runId, updateError.Error())
// First try to get the workflow and update it.
// If we fail to get the workflow (e.g., NotFound), we fall back to creating it.
maxRetries := 10
var finalErr error
b := backoff.NewExponentialBackOff()
b.InitialInterval = 100 * time.Millisecond
b.MaxInterval = 2 * time.Second
b.Reset()

for i := 0; i < maxRetries; i++ {
if ctx.Err() != nil {
return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId)
}

latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{})
if updateError == nil {
if err := latestWorkflow.CanRetry(); err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s as the refetched workflow does not allow retries", runId)
}
// Update the workflow's resource version to latest.
newExecSpec.SetVersion(latestWorkflow.Version())
_, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{})
}
if updateError != nil {
if apierrors.IsConflict(errors.Unwrap(updateError)) || apierrors.IsConflict(updateError) {
finalErr = updateError
delay := b.NextBackOff()
timer := time.NewTimer(delay)
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId)
}
continue
Comment on lines +1158 to +1168
}
// Remove resource version
newExecSpec.SetVersion("")
newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{})
if createError != nil {
if apierrors.IsAlreadyExists(errors.Unwrap(createError)) || apierrors.IsAlreadyExists(createError) {
finalErr = createError
delay := b.NextBackOff()
timer := time.NewTimer(delay)
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return util.NewInternalServerError(ctx.Err(), "Failed to retry run %s due to context cancellation", runId)
}
continue
Comment on lines +1174 to +1184
}
if createError, ok := createError.(net.Error); ok && createError.Timeout() {
return util.NewUnavailableServerError(createError, "Failed to retry run %s due to error creating and updating a workflow - try again later. Update error: %s", runId, updateError.Error())
}
return util.NewInternalServerError(createError, "Failed to retry run %s due to error updating and creating a workflow. Update error: %s", runId, updateError.Error())
}
return util.NewInternalServerError(createError, "Failed to retry run %s due to error updating and creating a workflow. Update error: %s", runId, updateError.Error())
newExecSpec = newCreatedWorkflow
}
newExecSpec = newCreatedWorkflow
finalErr = nil
break
}
if finalErr != nil {
return util.NewInternalServerError(finalErr, "Failed to retry run %s due to exhausted retries", runId)
}
// Notify plugins of retry
if run.PluginsOutputString != nil && *run.PluginsOutputString != "" {
Expand Down
Loading