Skip to content

Commit 248a1ee

Browse files
author
Neun Chaler
committed
Enable Concurrent Scaling and Handle Lease Error
1 parent 363de79 commit 248a1ee

2 files changed

Lines changed: 46 additions & 20 deletions

File tree

cluster-autoscaler/cloudprovider/crusoecloud/crusoe_node_group.go

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -116,25 +116,32 @@ func (ng *crusoeNodeGroup) IncreaseSize(delta int) error {
116116
klog.Errorf("IncreaseSize,PoolID=%s, failed trying to set target nodepool size to %d: %v", ng.pool.Id, targetSize, err)
117117
return err
118118
}
119-
op, err = ng.manager.WaitForNodePoolOperationComplete(ctx, op)
120-
if err != nil {
121-
klog.Errorf("IncreaseSize,PoolID=%s, failed waiting to set target nodepool size to %d: %v", ng.pool.Id, targetSize, err)
122-
return fmt.Errorf("couldn't increase pool size to %d: %w", targetSize, err)
123-
}
124-
if op.State == string(opFailed) {
125-
klog.Errorf("IncreaseSize,PoolID=%s, failed to set target nodepool size to %d: operation failed with %v", ng.pool.Id, targetSize, op.Result)
126-
return fmt.Errorf("couldn't increase pool size to %d: operation failed with %v", targetSize, op.Result)
127-
}
128119

129-
err = ng.refresh()
130-
if err != nil {
131-
klog.Errorf("IncreaseSize,PoolID=%s, failed to refresh node group after increase size: %v", ng.pool.Id, err)
132-
return fmt.Errorf("failed to refresh node group after increase size: %v", err)
120+
refreshErr := ng.refresh()
121+
if refreshErr != nil {
122+
klog.Errorf("IncreaseSize (background),PoolID=%s, failed to refresh node group: %v", ng.Id(), refreshErr)
133123
}
134124

125+
// target size has already updated so waiting for vms to be created can happen asynchronously
126+
go ng.trackIncreaseSizeAsync(ng.pool.Id, op)
127+
135128
return nil
136129
}
137130

131+
func (ng *crusoeNodeGroup) trackIncreaseSizeAsync(poolID string, op *crusoeapi.Operation) {
132+
ctx := context.Background()
133+
klog.V(5).Infof("IncreaseSize (background): waiting for opID=%s on poolID=%s", op.OperationId, poolID)
134+
135+
finalOp, waitErr := ng.manager.WaitForNodePoolOperationComplete(ctx, op)
136+
if waitErr != nil {
137+
klog.Errorf("IncreaseSize (background),PoolID=%s, failed waiting for opID=%s: %v", poolID, op.OperationId, waitErr)
138+
}
139+
140+
if finalOp.State == string(opFailed) {
141+
klog.Errorf("IncreaseSize (background),PoolID=%s, opID=%s failed: %v", poolID, op.OperationId, finalOp.Result)
142+
}
143+
}
144+
138145
// AtomicIncreaseSize is not implemented.
139146
func (ng *crusoeNodeGroup) AtomicIncreaseSize(delta int) error {
140147
return cloudprovider.ErrNotImplemented
@@ -174,7 +181,7 @@ func (ng *crusoeNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
174181
ng.nodeGroupRWMutex.RUnlock()
175182

176183
targetSize := min(ng.targetSize-len(nodeIDsToDelete), int(ng.pool.Count))
177-
klog.V(4).Infof("DeleteNodes,%d nodes to reclaim (%d target size); ng=%v, pool=%v", len(nodes), targetSize, ng, ng.pool)
184+
klog.V(4).Infof("DeleteNodes,%d nodes to reclaim (%d target size); ng=%v, pool id=%v", len(nodes), targetSize, ng, ng.pool.Id)
178185
if targetSize >= int(ng.pool.Count) {
179186
klog.V(4).Infof("DeleteNodes,PoolID=%s, new target size (%d) greater than or equal to the desired count (%d), skip updating desired count",
180187
ng.pool.Id, targetSize, ng.pool.Count,
@@ -205,6 +212,7 @@ func (ng *crusoeNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
205212
var multiErr error
206213

207214
vmOps := make([]*crusoeapi.Operation, 0, len(nodeIDsToDelete))
215+
nodesInDeletionSet := make([]string, len(nodeIDsToDelete))
208216
for _, id := range nodeIDsToDelete {
209217
op, err := ng.manager.DeleteVMInstance(ctx, id)
210218
if err != nil {
@@ -214,8 +222,8 @@ func (ng *crusoeNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
214222
continue
215223
}
216224
ng.addNodeToDeletionInProgressSet(id)
217-
defer ng.removeNodeFromDeletionInProgressSet(id)
218225
vmOps = append(vmOps, op)
226+
nodesInDeletionSet = append(nodesInDeletionSet, id)
219227
}
220228

221229
err = ng.refresh()
@@ -227,11 +235,16 @@ func (ng *crusoeNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
227235
scalingMutexUnlocked = true
228236
ng.scalingMutex.Unlock()
229237

230-
_, err = ng.manager.WaitForVMOperationListComplete(ctx, vmOps)
231-
if err != nil {
232-
klog.Errorf("DeleteNodes,failed to delete one or more nodes: %v", err)
233-
multiErr = multierr.Append(multiErr, fmt.Errorf("failed to wait for all vm operations or some operations failed: %v", err))
234-
}
238+
go func() {
239+
// target size has already updated so waiting for vm operations can happen asynchronously
240+
_, err = ng.manager.WaitForVMOperationListComplete(ctx, vmOps)
241+
if err != nil {
242+
klog.Errorf("DeleteNodes (background),failed to delete one or more nodes: %v", err)
243+
}
244+
for _, id := range nodesInDeletionSet {
245+
ng.removeNodeFromDeletionInProgressSet(id)
246+
}
247+
}()
235248

236249
return multiErr
237250
}

cluster-autoscaler/cloudprovider/crusoecloud/op_wait_utils.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
const (
3333
operationBackoffIntervalDefault = 1
3434
operationBackoffJitterRangeDefault = 1000
35+
operationTimeout = 30 * time.Minute
3536
)
3637

3738
var (
@@ -107,7 +108,19 @@ func (w *waitBackoff) WaitForOperationComplete(ctx context.Context, op *crusoeap
107108
return nil, nil
108109
}
109110

111+
ctx, cancel := context.WithTimeout(ctx, operationTimeout)
112+
defer cancel()
113+
110114
for op.State == string(opInProgress) {
115+
select {
116+
case <-ctx.Done():
117+
return op, fmt.Errorf(
118+
"operation %s did not complete within %v: %w",
119+
op.OperationId, operationTimeout, ctx.Err(),
120+
)
121+
default:
122+
}
123+
111124
updatedOp, err := pollOp(ctx, op.OperationId)
112125
if err != nil {
113126
return nil, fmt.Errorf("error getting operation with id %s: %w", op.OperationId, err)

0 commit comments

Comments
 (0)