Skip to content

Commit 11454d2

Browse files
committed
Use sync.WaitGroup to avoid exiting before components have shut down
Currently only waits on etcd and kine, as other components are stateless and do not need to shut down cleanly. Terminal but non-fatal errors now request shutdown via context cancellation, instead of just logging a fatal error. Signed-off-by: Brad Davidson <[email protected]>
1 parent e196228 commit 11454d2

File tree

32 files changed

+382
-252
lines changed

32 files changed

+382
-252
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ replace (
1717
github.com/golang/protobuf => github.com/golang/protobuf v1.5.4
1818
github.com/google/cadvisor => github.com/k3s-io/cadvisor v0.52.1
1919
github.com/googleapis/gax-go/v2 => github.com/googleapis/gax-go/v2 v2.12.0
20+
github.com/k3s-io/kine => github.com/brandond/kine v0.14.1-0.20250914084328-b26c9d642f69
2021
github.com/open-policy-agent/opa => github.com/open-policy-agent/opa v0.59.0 // github.com/Microsoft/hcsshim using bad version v0.42.2
2122
github.com/opencontainers/runc => github.com/opencontainers/runc v1.3.1
2223
github.com/opencontainers/selinux => github.com/opencontainers/selinux v1.11.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y
295295
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
296296
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
297297
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
298+
github.com/brandond/kine v0.14.1-0.20250914084328-b26c9d642f69 h1:v5gN3RUPblkWsMNc0vvWHIFWLQMNgybCHI/7ErlnthE=
299+
github.com/brandond/kine v0.14.1-0.20250914084328-b26c9d642f69/go.mod h1:mtcQsUUA0XpbKlQJevLERc4YU1ao44ErPCbbItpZLjo=
298300
github.com/bronze1man/goStrongswanVici v0.0.0-20231128135937-211cef3b0b20 h1:JMoL5xJSYxo1QVJ3c+4FutWQnks3gZX9DYkgAnvg+5g=
299301
github.com/bronze1man/goStrongswanVici v0.0.0-20231128135937-211cef3b0b20/go.mod h1:fWUtBEPt2yjrr3WFhOqvajM8JSEU8bEeBcoeSCsKRpc=
300302
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
@@ -825,8 +827,6 @@ github.com/k3s-io/etcd/server/v3 v3.6.4-k3s3 h1:DY9zD2dDmIw6L0G+nL5s4ivIV73f4lQI
825827
github.com/k3s-io/etcd/server/v3 v3.6.4-k3s3/go.mod h1:aYCL/h43yiONOv0QIR82kH/2xZ7m+IWYjzRmyQfnCAg=
826828
github.com/k3s-io/helm-controller v0.16.13 h1:jMD5lI4Mzo9uclZjLgJ8Yak6wSnXaL/tuTU+dEEx69Q=
827829
github.com/k3s-io/helm-controller v0.16.13/go.mod h1:mw6sVaH/eli+81sUnRqRYh5wV7YVzYp+U7OucLT5kUc=
828-
github.com/k3s-io/kine v0.14.0 h1:4vmVEErYwSPESHI5t0S6gL3aCOfmLlHfKnwrsuvupl4=
829-
github.com/k3s-io/kine v0.14.0/go.mod h1:mtcQsUUA0XpbKlQJevLERc4YU1ao44ErPCbbItpZLjo=
830830
github.com/k3s-io/klog/v2 v2.120.1-k3s1 h1:7twAHPFpZA21KdMnMNnj68STQMPldAxF2Zsaol57dxw=
831831
github.com/k3s-io/klog/v2 v2.120.1-k3s1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
832832
github.com/k3s-io/kube-router/v2 v2.5.0 h1:pG46bYnvi17z/Zp7W1MTeTax8HaFEp7zCzUu59UwKpI=

pkg/agent/containerd/containerd.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/k3s-io/k3s/pkg/agent/cri"
2121
util2 "github.com/k3s-io/k3s/pkg/agent/util"
2222
"github.com/k3s-io/k3s/pkg/daemons/config"
23+
"github.com/k3s-io/k3s/pkg/signals"
2324
"github.com/k3s-io/k3s/pkg/version"
2425
"github.com/natefinch/lumberjack"
2526
pkgerrors "github.com/pkg/errors"
@@ -106,10 +107,9 @@ func Run(ctx context.Context, cfg *config.Node) error {
106107
addDeathSig(cmd)
107108
err := cmd.Run()
108109
if err != nil && !errors.Is(err, context.Canceled) {
109-
logrus.Errorf("containerd exited: %s", err)
110-
os.Exit(1)
110+
signals.RequestShutdown(pkgerrors.WithMessage(err, "containerd exited"))
111111
}
112-
os.Exit(0)
112+
signals.RequestShutdown(nil)
113113
}()
114114

115115
if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil {

pkg/agent/containerd/watcher.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ func importAndWatchImages(ctx context.Context, cfg *config.Node) error {
259259

260260
// wait for the workqueue to empty before returning
261261
for w.workqueue.Len() > 0 {
262-
logrus.Debugf("Waiting for initial import of images from %s", cfg.Images)
263-
time.Sleep(time.Second * 2)
262+
time.Sleep(500 * time.Millisecond)
264263
}
265264

266265
// prune unseen entries from last run once all existing files have been processed

pkg/agent/cridockerd/cridockerd.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212

1313
"github.com/Mirantis/cri-dockerd/cmd"
1414
"github.com/Mirantis/cri-dockerd/cmd/version"
15+
pkgerrors "github.com/pkg/errors"
1516

1617
"github.com/k3s-io/k3s/pkg/agent/cri"
1718
"github.com/k3s-io/k3s/pkg/cgroups"
1819
"github.com/k3s-io/k3s/pkg/daemons/config"
20+
"github.com/k3s-io/k3s/pkg/signals"
1921
"github.com/k3s-io/k3s/pkg/util"
2022
"github.com/sirupsen/logrus"
2123

@@ -41,10 +43,9 @@ func Run(ctx context.Context, cfg *config.Node) error {
4143
}()
4244
err := command.ExecuteContext(ctx)
4345
if err != nil && !errors.Is(err, context.Canceled) {
44-
logrus.Errorf("cri-dockerd exited: %v", err)
45-
os.Exit(1)
46+
signals.RequestShutdown(pkgerrors.WithMessage(err, "cri-dockerd exited"))
4647
}
47-
os.Exit(0)
48+
signals.RequestShutdown(nil)
4849
}()
4950

5051
return cri.WaitForService(ctx, cfg.CRIDockerd.Address, "cri-dockerd")

pkg/agent/flannel/flannel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ var (
5151
FlannelExternalIPv6Annotation = FlannelBaseAnnotation + "/public-ipv6-overwrite"
5252
)
5353

54-
func flannel(ctx context.Context, flannelIface *net.Interface, flannelConf, kubeConfigFile string, flannelIPv6Masq bool, nm netMode) error {
54+
func flannel(ctx context.Context, wg *sync.WaitGroup, flannelIface *net.Interface, flannelConf, kubeConfigFile string, flannelIPv6Masq bool, nm netMode) error {
5555
extIface, err := LookupExtInterface(flannelIface, nm)
5656
if err != nil {
5757
return pkgerrors.WithMessage(err, "failed to find the interface")
@@ -80,7 +80,7 @@ func flannel(ctx context.Context, flannelIface *net.Interface, flannelConf, kube
8080
return pkgerrors.WithMessage(err, "failed to create the flannel backend")
8181
}
8282

83-
bn, err := be.RegisterNetwork(ctx, &sync.WaitGroup{}, config)
83+
bn, err := be.RegisterNetwork(ctx, wg, config)
8484
if err != nil {
8585
return pkgerrors.WithMessage(err, "failed to register flannel network")
8686
}

pkg/agent/flannel/setup.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ import (
55
"errors"
66
"fmt"
77
"net"
8-
"os"
98
"path/filepath"
109
goruntime "runtime"
1110
"strings"
11+
"sync"
1212

1313
agentutil "github.com/k3s-io/k3s/pkg/agent/util"
1414
"github.com/k3s-io/k3s/pkg/daemons/config"
15+
"github.com/k3s-io/k3s/pkg/signals"
1516
"github.com/k3s-io/k3s/pkg/util"
1617
pkgerrors "github.com/pkg/errors"
1718
"github.com/sirupsen/logrus"
@@ -73,7 +74,7 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error {
7374
return createFlannelConf(nodeConfig)
7475
}
7576

76-
func Run(ctx context.Context, nodeConfig *config.Node) error {
77+
func Run(ctx context.Context, wg *sync.WaitGroup, nodeConfig *config.Node) error {
7778
logrus.Infof("Starting flannel with backend %s", nodeConfig.FlannelBackend)
7879

7980
kubeConfig := nodeConfig.AgentConfig.KubeConfigKubelet
@@ -104,12 +105,11 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
104105
return pkgerrors.WithMessage(err, "failed to check netMode for flannel")
105106
}
106107
go func() {
107-
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConfFile, kubeConfig, nodeConfig.FlannelIPv6Masq, nm)
108+
err := flannel(ctx, wg, nodeConfig.FlannelIface, nodeConfig.FlannelConfFile, kubeConfig, nodeConfig.FlannelIPv6Masq, nm)
108109
if err != nil && !errors.Is(err, context.Canceled) {
109-
logrus.Errorf("flannel exited: %v", err)
110-
os.Exit(1)
110+
signals.RequestShutdown(pkgerrors.WithMessage(err, "flannel exited"))
111111
}
112-
os.Exit(0)
112+
signals.RequestShutdown(nil)
113113
}()
114114

115115
return nil

pkg/agent/netpol/netpol.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func init() {
4646
// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78
4747
// It converts the k3s config.Node into kube-router configuration (only the
4848
// subset of options needed for netpol controller).
49-
func Run(ctx context.Context, nodeConfig *config.Node) error {
49+
func Run(ctx context.Context, wg *sync.WaitGroup, nodeConfig *config.Node) error {
5050
set, err := utils.NewIPSet(false)
5151
if err != nil {
5252
logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err)
@@ -119,9 +119,6 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
119119
stopCh := ctx.Done()
120120
healthCh := make(chan *healthcheck.ControllerHeartbeat)
121121

122-
// We don't use this WaitGroup, but kube-router components require it.
123-
var wg sync.WaitGroup
124-
125122
informerFactory := informers.NewSharedInformerFactory(client, 0)
126123
podInformer := informerFactory.Core().V1().Pods().Informer()
127124
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
@@ -176,10 +173,10 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
176173
hc.SetAlive()
177174

178175
wg.Add(1)
179-
go hc.RunCheck(healthCh, stopCh, &wg)
176+
go hc.RunCheck(healthCh, stopCh, wg)
180177

181178
wg.Add(1)
182-
go metricsRunCheck(mc, healthCh, stopCh, &wg)
179+
go metricsRunCheck(mc, healthCh, stopCh, wg)
183180

184181
npc, err := netpol.NewNetworkPolicyController(client, krConfig, podInformer, npInformer, nsInformer, &sync.Mutex{}, nil,
185182
iptablesCmdHandlers, ipSetHandlers)
@@ -193,7 +190,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
193190

194191
wg.Add(1)
195192
logrus.Infof("Starting network policy controller version %s, built on %s, %s", version.Version, version.BuildDate, runtime.Version())
196-
go npc.Run(healthCh, stopCh, &wg)
193+
go npc.Run(healthCh, stopCh, wg)
197194

198195
return nil
199196
}

pkg/agent/run.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
goruntime "runtime"
1111
"strconv"
1212
"strings"
13+
"sync"
1314
"time"
1415

1516
systemd "github.com/coreos/go-systemd/v22/daemon"
@@ -32,6 +33,7 @@ import (
3233
"github.com/k3s-io/k3s/pkg/nodeconfig"
3334
"github.com/k3s-io/k3s/pkg/profile"
3435
"github.com/k3s-io/k3s/pkg/rootless"
36+
"github.com/k3s-io/k3s/pkg/signals"
3537
"github.com/k3s-io/k3s/pkg/spegel"
3638
"github.com/k3s-io/k3s/pkg/util"
3739
"github.com/k3s-io/k3s/pkg/version"
@@ -147,34 +149,29 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
147149
}
148150
}
149151

150-
if nodeConfig.Docker {
151-
if err := executor.Docker(ctx, nodeConfig); err != nil {
152-
return err
153-
}
154-
} else if nodeConfig.ContainerRuntimeEndpoint == "" {
155-
if err := containerd.SetupContainerdConfig(nodeConfig); err != nil {
156-
return err
157-
}
158-
if err := executor.Containerd(ctx, nodeConfig); err != nil {
159-
return err
160-
}
161-
} else {
162-
if err := executor.CRI(ctx, nodeConfig); err != nil {
163-
return err
164-
}
165-
}
152+
// Create a new context to use for agent components that is cancelled on a
153+
// delay after the signal context. This allows other things (like etcd) to
154+
// clean up, before agent components exit when their contexts are cancelled.
155+
ctx = util.DelayCancel(ctx, util.DefaultContextDelay)
166156

167157
notifySocket := os.Getenv("NOTIFY_SOCKET")
168158
os.Unsetenv("NOTIFY_SOCKET")
169159

160+
go func() {
161+
if err := startCRI(ctx, nodeConfig); err != nil {
162+
signals.RequestShutdown(pkgerrors.WithMessage(err, "failed to start container runtime"))
163+
}
164+
}()
165+
170166
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
171167
return err
172168
}
173169

174170
go func() {
175171
<-executor.APIServerReadyChan()
176-
if err := startNetwork(ctx, nodeConfig); err != nil {
177-
logrus.Fatalf("Failed to start networking: %v", err)
172+
if err := startNetwork(ctx, &sync.WaitGroup{}, nodeConfig); err != nil {
173+
signals.RequestShutdown(pkgerrors.WithMessage(err, "failed to start networking"))
174+
return
178175
}
179176

180177
// By default, the server is responsible for notifying systemd
@@ -189,9 +186,23 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
189186
return nil
190187
}
191188

189+
// startCRI starts the configured CRI, or waits for an external CRI to be ready.
190+
func startCRI(ctx context.Context, nodeConfig *daemonconfig.Node) error {
191+
if nodeConfig.Docker {
192+
return executor.Docker(ctx, nodeConfig)
193+
} else if nodeConfig.ContainerRuntimeEndpoint == "" {
194+
if err := containerd.SetupContainerdConfig(nodeConfig); err != nil {
195+
return err
196+
}
197+
return executor.Containerd(ctx, nodeConfig)
198+
} else {
199+
return executor.CRI(ctx, nodeConfig)
200+
}
201+
}
202+
192203
// startNetwork updates the network annotations on the node, and starts flannel
193204
// and the kube-router netpol controller, if enabled.
194-
func startNetwork(ctx context.Context, nodeConfig *daemonconfig.Node) error {
205+
func startNetwork(ctx context.Context, wg *sync.WaitGroup, nodeConfig *daemonconfig.Node) error {
195206
// Use the kubelet kubeconfig to update annotations on the local node
196207
kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
197208
if err != nil {
@@ -203,13 +214,13 @@ func startNetwork(ctx context.Context, nodeConfig *daemonconfig.Node) error {
203214
}
204215

205216
if !nodeConfig.NoFlannel {
206-
if err := flannel.Run(ctx, nodeConfig); err != nil {
217+
if err := flannel.Run(ctx, wg, nodeConfig); err != nil {
207218
return err
208219
}
209220
}
210221

211222
if !nodeConfig.AgentConfig.DisableNPC {
212-
if err := netpol.Run(ctx, nodeConfig); err != nil {
223+
if err := netpol.Run(ctx, wg, nodeConfig); err != nil {
213224
return err
214225
}
215226
}
@@ -264,7 +275,7 @@ func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubePro
264275
// RunStandalone bootstraps the executor, but does not run the kubelet or containerd.
265276
// This allows other bits of code that expect the executor to be set up properly to function
266277
// even when the agent is disabled.
267-
func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
278+
func RunStandalone(ctx context.Context, wg *sync.WaitGroup, cfg cmds.Agent) error {
268279
proxy, err := createProxyAndValidateToken(ctx, &cfg)
269280
if err != nil {
270281
return err
@@ -308,7 +319,7 @@ func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
308319

309320
// Run sets up cgroups, configures the LB proxy, and triggers startup
310321
// of containerd and kubelet.
311-
func Run(ctx context.Context, cfg cmds.Agent) error {
322+
func Run(ctx context.Context, wg *sync.WaitGroup, cfg cmds.Agent) error {
312323
if err := cgroups.Validate(); err != nil {
313324
return err
314325
}

pkg/agent/tunnel/tunnel.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717
"github.com/k3s-io/k3s/pkg/clientaccess"
1818
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
1919
"github.com/k3s-io/k3s/pkg/daemons/executor"
20+
"github.com/k3s-io/k3s/pkg/signals"
2021
"github.com/k3s-io/k3s/pkg/util"
2122
"github.com/k3s-io/k3s/pkg/version"
23+
pkgerrors "github.com/pkg/errors"
2224
"github.com/rancher/remotedialer"
2325
"github.com/sirupsen/logrus"
2426
"github.com/yl2chen/cidranger"
@@ -117,7 +119,8 @@ func (a *agentTunnel) startWatches(ctx context.Context, config *daemonconfig.Nod
117119
Group: "discovery.k8s.io",
118120
Resource: "endpointslices",
119121
}, ""); err != nil {
120-
logrus.Fatalf("Tunnel watches failed to wait for RBAC: %v", err)
122+
signals.RequestShutdown(pkgerrors.WithMessage(err, "tunnel watches failed to wait for RBAC"))
123+
return
121124
}
122125

123126
close(rbacReady)

0 commit comments

Comments
 (0)