Skip to content

Commit 5e6ad52

Browse files
Fix upgrade for same versions (#7635)
1 parent 6a9d232 commit 5e6ad52

File tree

8 files changed

+389
-62
lines changed

8 files changed

+389
-62
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,28 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
9292
// User is responsible for obtaining and releasing bkgMutex lock
9393
func (h *Upgrade) ackActions(ctx context.Context, ack acker.Acker) {
9494
for _, a := range h.bkgActions {
95-
if err := ack.Ack(ctx, a); err != nil {
96-
h.log.Errorf("ack of failed upgrade failed: %v", err)
97-
}
95+
h.ackAction(ctx, ack, a, false)
9896
}
9997
h.bkgActions = nil
10098
if err := ack.Commit(ctx); err != nil {
10199
h.log.Errorf("commit of ack for failed upgrade failed: %v", err)
102100
}
103101
}
104102

103+
// ackActions Acks all the actions in bkgActions, and deletes entries from bkgActions.
104+
// User is responsible for obtaining and releasing bkgMutex lock
105+
func (h *Upgrade) ackAction(ctx context.Context, ack acker.Acker, action fleetapi.Action, commit bool) {
106+
if err := ack.Ack(ctx, action); err != nil {
107+
h.log.Errorf("ack of failed upgrade failed: %v", err)
108+
}
109+
110+
if commit {
111+
if err := ack.Commit(ctx); err != nil {
112+
h.log.Errorf("commit of ack for failed upgrade failed: %v", err)
113+
}
114+
}
115+
}
116+
105117
// getAsyncContext returns a cancelContext and whether or not to run the upgrade
106118
func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, ack acker.Acker) (context.Context, bool) {
107119
h.bkgMutex.Lock()
@@ -125,11 +137,22 @@ func (h *Upgrade) getAsyncContext(ctx context.Context, action fleetapi.Action, a
125137
h.log.Errorf("invalid type, expected ActionUpgrade and received %T", action)
126138
return nil, false
127139
}
128-
if (upgradeAction.Data.Version == bkgAction.Data.Version) &&
129-
(upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI) {
140+
if upgradeAction.ActionID == bkgAction.ActionID {
130141
h.log.Infof("Duplicate upgrade to version %s received",
131142
bkgAction.Data.Version)
132-
h.bkgActions = append(h.bkgActions, action)
143+
return nil, false
144+
}
145+
146+
if upgradeAction.Data.Version == bkgAction.Data.Version &&
147+
upgradeAction.Data.SourceURI == bkgAction.Data.SourceURI {
148+
// not the same action this one needs to be acked
149+
go func() {
150+
// kick it off and don't block, lock to prevent race with ackActions from finished upgrade
151+
h.bkgMutex.Lock()
152+
defer h.bkgMutex.Unlock()
153+
154+
h.ackAction(ctx, ack, upgradeAction, true)
155+
}()
133156
return nil, false
134157
}
135158

internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go

Lines changed: 127 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/stretchr/testify/mock"
1415
"github.com/stretchr/testify/require"
1516

1617
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
@@ -68,7 +69,11 @@ func (u *mockUpgradeManager) Upgrade(
6869
pgpBytes...)
6970
}
7071

71-
func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
72+
func (u *mockUpgradeManager) Ack(_ context.Context, _ acker.Acker) error {
73+
return nil
74+
}
75+
76+
func (u *mockUpgradeManager) AckAction(_ context.Context, _ acker.Acker, _ fleetapi.Action) error {
7277
return nil
7378
}
7479

@@ -110,7 +115,7 @@ func TestUpgradeHandler(t *testing.T) {
110115
return nil, nil
111116
},
112117
},
113-
nil, nil, nil, nil, nil, false, nil)
118+
nil, nil, nil, nil, nil, false, nil, nil)
114119
//nolint:errcheck // We don't need the termination state of the Coordinator
115120
go c.Run(ctx)
116121

@@ -169,7 +174,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
169174
return nil, err
170175
},
171176
},
172-
nil, nil, nil, nil, nil, false, nil)
177+
nil, nil, nil, nil, nil, false, nil, nil)
173178
//nolint:errcheck // We don't need the termination state of the Coordinator
174179
go c.Run(ctx)
175180

@@ -190,6 +195,98 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
190195
}
191196
}
192197

198+
func TestDuplicateActionsHandled(t *testing.T) {
199+
// Create a cancellable context that will shut down the coordinator after
200+
// the test.
201+
ctx, cancel := context.WithCancel(context.Background())
202+
defer cancel()
203+
204+
log, _ := logger.New("", false)
205+
upgradeCalledChan := make(chan string)
206+
207+
agentInfo := &info.AgentInfo{}
208+
acker := &fakeAcker{}
209+
210+
// Create and start the Coordinator
211+
c := coordinator.New(
212+
log,
213+
configuration.DefaultConfiguration(),
214+
logger.DefaultLogLevel,
215+
agentInfo,
216+
component.RuntimeSpecs{},
217+
nil,
218+
&mockUpgradeManager{
219+
UpgradeFn: func(
220+
ctx context.Context,
221+
version string,
222+
sourceURI string,
223+
action *fleetapi.ActionUpgrade,
224+
details *details.Details,
225+
skipVerifyOverride bool,
226+
skipDefaultPgp bool,
227+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
228+
229+
defer func() {
230+
upgradeCalledChan <- action.ActionID
231+
}()
232+
233+
return nil, nil
234+
},
235+
},
236+
nil, nil, nil, nil, nil, false, nil, acker)
237+
//nolint:errcheck // We don't need the termination state of the Coordinator
238+
go c.Run(ctx)
239+
240+
u := NewUpgrade(log, c)
241+
a1 := fleetapi.ActionUpgrade{
242+
ActionID: "action-8.5-1",
243+
Data: fleetapi.ActionUpgradeData{
244+
Version: "8.5.0", SourceURI: "http://localhost",
245+
},
246+
}
247+
a2 := fleetapi.ActionUpgrade{
248+
ActionID: "action-8.5-2",
249+
Data: fleetapi.ActionUpgradeData{
250+
Version: "8.5.0", SourceURI: "http://localhost",
251+
},
252+
}
253+
254+
checkMsg := func(c <-chan string, expected, errMsg string) error {
255+
t.Helper()
256+
// Make sure this test does not dead lock or wait for too long
257+
// For some reason < 1s sometimes makes the test fail.
258+
select {
259+
case <-time.Tick(1500 * time.Millisecond):
260+
return errors.New("timed out waiting for Upgrade to return")
261+
case msg := <-c:
262+
require.Equal(t, expected, msg, errMsg)
263+
}
264+
265+
return nil
266+
}
267+
268+
acker.On("Ack", mock.Anything, mock.Anything).Return(nil)
269+
acker.On("Commit", mock.Anything).Return(nil)
270+
271+
t.Log("First upgrade action should be processed")
272+
require.NoError(t, u.Handle(ctx, &a1, acker))
273+
require.Nil(t, checkMsg(upgradeCalledChan, a1.ActionID, "action was not processed"))
274+
c.ClearOverrideState() // it's upgrading, normally we would restart
275+
276+
t.Log("Action with different ID but same version should not be propagated to upgrader but acked")
277+
require.NoError(t, u.Handle(ctx, &a2, acker))
278+
require.NotNil(t, checkMsg(upgradeCalledChan, a2.ActionID, "action was not processed"))
279+
acker.AssertCalled(t, "Ack", ctx, &a2)
280+
acker.AssertCalled(t, "Commit", ctx)
281+
282+
c.ClearOverrideState() // it's upgrading, normally we would restart
283+
284+
t.Log("Resending action with same ID should be skipped")
285+
require.NoError(t, u.Handle(ctx, &a1, acker))
286+
require.NotNil(t, checkMsg(upgradeCalledChan, a1.ActionID, "action was not processed"))
287+
acker.AssertNotCalled(t, "Ack", ctx, &a1)
288+
}
289+
193290
func TestUpgradeHandlerNewVersion(t *testing.T) {
194291
// Create a cancellable context that will shut down the coordinator after
195292
// the test.
@@ -230,15 +327,23 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
230327
return nil, nil
231328
},
232329
},
233-
nil, nil, nil, nil, nil, false, nil)
330+
nil, nil, nil, nil, nil, false, nil, nil)
234331
//nolint:errcheck // We don't need the termination state of the Coordinator
235332
go c.Run(ctx)
236333

237334
u := NewUpgrade(log, c)
238-
a1 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
239-
Version: "8.2.0", SourceURI: "http://localhost"}}
240-
a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
241-
Version: "8.5.0", SourceURI: "http://localhost"}}
335+
a1 := fleetapi.ActionUpgrade{
336+
ActionID: "action-8.2",
337+
Data: fleetapi.ActionUpgradeData{
338+
Version: "8.2.0", SourceURI: "http://localhost",
339+
},
340+
}
341+
a2 := fleetapi.ActionUpgrade{
342+
ActionID: "action-8.5",
343+
Data: fleetapi.ActionUpgradeData{
344+
Version: "8.5.0", SourceURI: "http://localhost",
345+
},
346+
}
242347
ack := noopacker.New()
243348

244349
checkMsg := func(c <-chan string, expected, errMsg string) {
@@ -262,3 +367,17 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
262367
require.NoError(t, err2)
263368
checkMsg(upgradeCalledChan, "8.5.0", "second call to Upgrade must be with version 8.5.0")
264369
}
370+
371+
type fakeAcker struct {
372+
mock.Mock
373+
}
374+
375+
func (f *fakeAcker) Ack(ctx context.Context, action fleetapi.Action) error {
376+
args := f.Called(ctx, action)
377+
return args.Error(0)
378+
}
379+
380+
func (f *fakeAcker) Commit(ctx context.Context) error {
381+
args := f.Called(ctx)
382+
return args.Error(0)
383+
}

internal/pkg/agent/application/application.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@ import (
2121
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
2222
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2323
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
24+
stateStore "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
2425
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
2526
"github.com/elastic/elastic-agent/internal/pkg/composable"
2627
"github.com/elastic/elastic-agent/internal/pkg/composable/providers/kubernetes"
2728
"github.com/elastic/elastic-agent/internal/pkg/config"
29+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
30+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/fleet"
31+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
32+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
33+
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
2834
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
2935
"github.com/elastic/elastic-agent/internal/pkg/release"
3036
"github.com/elastic/elastic-agent/pkg/component"
@@ -136,6 +142,7 @@ func New(
136142
var composableManaged bool
137143
var isManaged bool
138144

145+
var actionAcker acker.Acker
139146
if testingMode {
140147
log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol")
141148

@@ -182,8 +189,29 @@ func New(
182189
InjectProxyEndpointModifier(),
183190
)
184191

192+
client, err := fleetclient.NewAuthWithConfig(log, cfg.Fleet.AccessAPIKey, cfg.Fleet.Client)
193+
if err != nil {
194+
return nil, nil, nil, errors.New(err,
195+
"fail to create API client",
196+
errors.TypeNetwork,
197+
errors.M(errors.MetaKeyURI, cfg.Fleet.Client.Host))
198+
}
199+
stateStorage, err := stateStore.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
200+
if err != nil {
201+
return nil, nil, nil, errors.New(err, fmt.Sprintf("fail to read state store '%s'", paths.AgentStateStoreFile()))
202+
}
203+
204+
fleetAcker, err := fleet.NewAcker(log, agentInfo, client)
205+
if err != nil {
206+
return nil, nil, nil, fmt.Errorf("failed to create acker: %w", err)
207+
}
208+
209+
retrier := retrier.New(fleetAcker, log)
210+
batchedAcker := lazy.NewAcker(fleetAcker, log, lazy.WithRetrier(retrier))
211+
actionAcker = stateStore.NewStateStoreActionAcker(batchedAcker, stateStorage)
212+
185213
// TODO: stop using global state
186-
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), upgrader)
214+
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, upgrader)
187215
if err != nil {
188216
return nil, nil, nil, err
189217
}
@@ -197,7 +225,7 @@ func New(
197225
}
198226

199227
otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"))
200-
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, compModifiers...)
228+
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, compModifiers...)
201229
if managed != nil {
202230
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
203231
// coordinator, so it must be set here once the coordinator is created

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ type UpgradeManager interface {
7575
// Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action
7676
Ack(ctx context.Context, acker acker.Acker) error
7777

78+
// AckAction is used to ack not persisted action.
79+
AckAction(ctx context.Context, acker acker.Acker, action fleetapi.Action) error
80+
7881
// MarkerWatcher returns a watcher for the upgrade marker.
7982
MarkerWatcher() upgrade.MarkerWatcher
8083
}
@@ -204,8 +207,9 @@ type Coordinator struct {
204207
agentInfo info.Agent
205208
isManaged bool
206209

207-
cfg *configuration.Configuration
208-
specs component.RuntimeSpecs
210+
cfg *configuration.Configuration
211+
specs component.RuntimeSpecs
212+
fleetAcker acker.Acker
209213

210214
reexecMgr ReExecManager
211215
upgradeMgr UpgradeManager
@@ -372,7 +376,7 @@ type UpdateComponentChange struct {
372376
}
373377

374378
// New creates a new coordinator.
375-
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, modifiers ...ComponentsModifier) *Coordinator {
379+
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator {
376380
var fleetState cproto.State
377381
var fleetMessage string
378382
if !isManaged {
@@ -426,6 +430,8 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
426430
heartbeatChan: make(chan struct{}),
427431
componentPIDTicker: time.NewTicker(time.Second * 30),
428432
componentPidRequiresUpdate: &atomic.Bool{},
433+
434+
fleetAcker: fleetAcker,
429435
}
430436
// Setup communication channels for any non-nil components. This pattern
431437
// lets us transparently accept nil managers / simulated events during
@@ -585,7 +591,7 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
585591
if errors.Is(err, upgrade.ErrUpgradeSameVersion) {
586592
// Set upgrade state to completed so update no longer shows in-progress.
587593
det.SetState(details.StateCompleted)
588-
return nil
594+
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
589595
}
590596
det.Fail(err)
591597
return err

0 commit comments

Comments
 (0)