Skip to content

Commit 5ea3f26

Browse files
authored
Merge pull request #4948 from cyclinder/ipam/clean_ep
Remove outdated endpoints to prevent blocking the creation of new Pods
2 parents 101046e + bb465fa commit 5ea3f26

File tree

14 files changed

+226
-51
lines changed

14 files changed

+226
-51
lines changed

charts/spiderpool/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ helm install spiderpool spiderpool/spiderpool --wait --namespace kube-system \
135135
| `ipam.enableKubevirtStaticIP` | the feature to keep kubevirt vm pod static IP | `true` |
136136
| `ipam.enableIPConflictDetection` | enable IP conflict detection | `false` |
137137
| `ipam.enableGatewayDetection` | enable gateway detection | `false` |
138+
| `ipam.enableCleanOutdatedEndpoint` | enable clean outdated endpoint | `false` |
138139
| `ipam.spiderSubnet.enable` | SpiderSubnet feature. | `true` |
139140
| `ipam.spiderSubnet.autoPool.enable` | SpiderSubnet Auto IPPool feature. | `true` |
140141
| `ipam.spiderSubnet.autoPool.defaultRedundantIPNumber` | the default redundant IP number of SpiderSubnet feature auto-created IPPools | `1` |

charts/spiderpool/templates/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ data:
2020
enableIPv6: {{ .Values.ipam.enableIPv6 }}
2121
enableStatefulSet: {{ .Values.ipam.enableStatefulSet }}
2222
enableKubevirtStaticIP: {{ .Values.ipam.enableKubevirtStaticIP }}
23+
enableCleanOutdatedEndpoint: {{ .Values.ipam.enableCleanOutdatedEndpoint }}
2324
enableSpiderSubnet: {{ .Values.ipam.spiderSubnet.enable }}
2425
enableAutoPoolForApplication: {{ .Values.ipam.spiderSubnet.autoPool.enable }}
2526
enableIPConflictDetection: {{ .Values.ipam.enableIPConflictDetection }}

charts/spiderpool/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ ipam:
5959
## @param ipam.enableGatewayDetection enable gateway detection
6060
enableGatewayDetection: false
6161

62+
## @param ipam.enableCleanOutdatedEndpoint enable clean outdated endpoint
63+
enableCleanOutdatedEndpoint: false
64+
6265
spiderSubnet:
6366
## @param ipam.spiderSubnet.enable SpiderSubnet feature.
6467
enable: true

cmd/spiderpool-controller/cmd/daemon.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,8 @@ func initGCManager(ctx context.Context) {
399399
gcIPConfig.EnableStatefulSet = controllerContext.Cfg.EnableStatefulSet
400400
// EnableKubevirtStaticIP was determined by Configmap.
401401
gcIPConfig.EnableKubevirtStaticIP = controllerContext.Cfg.EnableKubevirtStaticIP
402+
// EnableCleanOutdatedEndpoint was determined by Configmap.
403+
gcIPConfig.EnableCleanOutdatedEndpoint = controllerContext.Cfg.EnableCleanOutdatedEndpoint
402404
gcIPConfig.LeaderRetryElectGap = time.Duration(controllerContext.Cfg.LeaseRetryGap) * time.Second
403405
gcManager, err := gcmanager.NewGCManager(
404406
controllerContext.ClientSet,

docs/concepts/ipam-des-zh_CN.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,21 @@ NOTE:
188188

189189
- 对于节点重启等意外情况导致 Pod 的 Sandbox 容器重启,Pod 状态为 Running 但 status 中的 podIPs 字段被清空,Spiderpool 之前会将其 IP 地址回收,但可能会导致 IP 地址的重复分配。目前 Spiderpool 默认不会回收该状态的 Pod。该功能可通过环境变量 [spiderpool-controller ENV](./../reference/spiderpool-controller.md#env)`SPIDERPOOL_GC_ENABLE_STATELESS_RUNNING_POD_ON_EMPTY_POD_STATUS_IPS` 控制(默认为 false)。
190190

191+
### 回收僵尸 SpiderEndpoint
192+
193+
Spiderpool 会周期性扫描 SpiderEndpoint,如果发现 IP 池中的某个 Pod 对应的 IP 分配记录已经不存在,但是其 SpiderEndpoint 对象仍然存在,Spiderpool 将会回收该 SpiderEndpoint。该功能可通过 `spiderpool-conf` configMap 开启或关闭, 默认为 false:
194+
195+
apiVersion: v1
196+
kind: ConfigMap
197+
metadata:
198+
name: spiderpool-conf
199+
namespace: spiderpool
200+
data:
201+
conf.yml: |
202+
...
203+
enableCleanOutdatedEndpoint: true
204+
...
205+
191206
### IP 冲突检测和网关可达性检测
192207

193208
对于 Underlay 网络,IP 冲突是无法接受的,这可能会造成严重的问题。Spiderpool 支持 IP 冲突检测和网关可达性检测,该功能以前由 coordinator 插件实现,由于可能会导致一些潜在的通信问题。现在由 IPAM 完成。

docs/concepts/ipam-des.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,23 @@ The above complete IP recovery algorithm can ensure the correct recovery of IP a
212212

213213
- For the **stateless** Pod in the `Running` phase, Spiderpool will not release its IP address when the Pod's `status.podIPs` is empty. This feature can be controlled by the environment variable `SPIDERPOOL_GC_ENABLE_STATELESS_RUNNING_POD_ON_EMPTY_POD_STATUS_IPS`.
214214

215+
### Clean Outdated SpiderEndpoint
216+
217+
Spiderpool periodically scans SpiderEndpoints. If it finds that the IP allocation record for a Pod in the IP pool no longer exists, but the corresponding SpiderEndpoint object still exists, Spiderpool will reclaim that SpiderEndpoint. This feature can be enabled or disabled via the spiderpool-conf ConfigMap, and is disabled (false) by default:
218+
219+
```yaml
220+
apiVersion: v1
221+
kind: ConfigMap
222+
metadata:
223+
name: spiderpool-conf
224+
namespace: spiderpool
225+
data:
226+
conf.yml: |
227+
...
228+
enableCleanOutdatedEndpoint: true
229+
...
230+
```
231+
215232
### IP Conflict Detection and Gateway Reachability Detection
216233

217234
For Underlay networks, IP conflicts are unacceptable as they can cause serious issues. Spiderpool supports IP conflict detection and gateway reachability detection, which were previously implemented by the coordinator plugin but could cause some potential communication problems. Now, this is handled by IPAM.

pkg/gcmanager/gc_manager.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,20 @@ import (
88
"fmt"
99
"time"
1010

11-
"go.uber.org/zap"
12-
"k8s.io/client-go/informers"
13-
"k8s.io/client-go/kubernetes"
14-
1511
"github.com/spidernet-io/spiderpool/pkg/election"
1612
"github.com/spidernet-io/spiderpool/pkg/ippoolmanager"
1713
"github.com/spidernet-io/spiderpool/pkg/kubevirtmanager"
1814
"github.com/spidernet-io/spiderpool/pkg/limiter"
15+
"github.com/spidernet-io/spiderpool/pkg/lock"
1916
"github.com/spidernet-io/spiderpool/pkg/logutils"
2017
"github.com/spidernet-io/spiderpool/pkg/nodemanager"
2118
"github.com/spidernet-io/spiderpool/pkg/podmanager"
2219
"github.com/spidernet-io/spiderpool/pkg/statefulsetmanager"
2320
"github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager"
21+
22+
"go.uber.org/zap"
23+
"k8s.io/client-go/informers"
24+
"k8s.io/client-go/kubernetes"
2425
)
2526

2627
type GarbageCollectionConfig struct {
@@ -30,6 +31,7 @@ type GarbageCollectionConfig struct {
3031
EnableGCStatelessRunningPodOnEmptyPodStatusIPs bool
3132
EnableStatefulSet bool
3233
EnableKubevirtStaticIP bool
34+
EnableCleanOutdatedEndpoint bool
3335

3436
ReleaseIPWorkerNum int
3537
GCIPChannelBuffer int
@@ -77,6 +79,7 @@ type SpiderGC struct {
7779

7880
informerFactory informers.SharedInformerFactory
7981
gcLimiter limiter.Limiter
82+
Locker lock.Mutex
8083
}
8184

8285
func NewGCManager(clientSet *kubernetes.Clientset, config *GarbageCollectionConfig,
@@ -114,10 +117,9 @@ func NewGCManager(clientSet *kubernetes.Clientset, config *GarbageCollectionConf
114117
logger = logutils.Logger.Named("IP-GarbageCollection")
115118

116119
spiderGC := &SpiderGC{
117-
k8ClientSet: clientSet,
118-
PodDB: NewPodDBer(config.MaxPodEntryDatabaseCap),
119-
gcConfig: config,
120-
120+
k8ClientSet: clientSet,
121+
PodDB: NewPodDBer(config.MaxPodEntryDatabaseCap),
122+
gcConfig: config,
121123
gcSignal: make(chan struct{}, 1),
122124
gcIPPoolIPSignal: make(chan *PodEntry, config.GCIPChannelBuffer),
123125

@@ -130,6 +132,7 @@ func NewGCManager(clientSet *kubernetes.Clientset, config *GarbageCollectionConf
130132

131133
leader: spiderControllerLeader,
132134
gcLimiter: limiter.NewLimiter(limiter.LimiterConfig{}),
135+
Locker: lock.Mutex{},
133136
}
134137

135138
return spiderGC, nil

pkg/gcmanager/scanAll_IPPool.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313
apierrors "k8s.io/apimachinery/pkg/api/errors"
1414
"k8s.io/client-go/tools/cache"
1515

16+
corev1 "k8s.io/api/core/v1"
17+
1618
"github.com/spidernet-io/spiderpool/pkg/constant"
1719
spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1"
1820
"github.com/spidernet-io/spiderpool/pkg/logutils"
1921
"github.com/spidernet-io/spiderpool/pkg/nodemanager"
2022
"github.com/spidernet-io/spiderpool/pkg/podmanager"
2123
"github.com/spidernet-io/spiderpool/pkg/types"
2224
"github.com/spidernet-io/spiderpool/pkg/utils/convert"
23-
corev1 "k8s.io/api/core/v1"
2425
)
2526

2627
// monitorGCSignal will monitor signal from CLI, DefaultGCInterval
@@ -76,6 +77,18 @@ func (s *SpiderGC) monitorGCSignal(ctx context.Context) {
7677

7778
// executeScanAll scans the whole pod and whole IPPoolList
7879
func (s *SpiderGC) executeScanAll(ctx context.Context) {
80+
epList, err := s.wepMgr.ListEndpoints(ctx, constant.UseCache)
81+
if err != nil {
82+
logger.Sugar().Errorf("failed to list all endpoints: %v, skip clean outdated endpoint", err)
83+
return
84+
}
85+
86+
suspiciousEndpointMap := make(map[string]*spiderpoolv2beta1.WorkloadEndpointStatus)
87+
for i := range epList.Items {
88+
key := fmt.Sprintf("%s/%s", epList.Items[i].Namespace, epList.Items[i].Name)
89+
suspiciousEndpointMap[key] = &epList.Items[i].Status
90+
}
91+
7992
poolList, err := s.ippoolMgr.ListIPPools(ctx, constant.UseCache)
8093
if err != nil {
8194
if apierrors.IsNotFound(err) {
@@ -127,6 +140,16 @@ func (s *SpiderGC) executeScanAll(ctx context.Context) {
127140
flagStaticIPPod := false
128141
shouldGcstatelessTerminatingPod := false
129142
endpoint, endpointErr := s.wepMgr.GetEndpointByName(ctx, podNS, podName, constant.UseCache)
143+
if endpointErr == nil && endpoint != nil {
144+
// If we find the endpoint through the allocation information in the IP pool,
145+
// it means that this endpoint is normal. Otherwise, it is very likely
146+
// to be a leaked endpoint. We remove it from the suspiciousEndpointMap, so
147+
// that it can be further cleaned up in the cleanOutdateEndpoint function.
148+
s.Locker.Lock()
149+
delete(suspiciousEndpointMap, poolIPAllocation.NamespacedName)
150+
s.Locker.Unlock()
151+
}
152+
130153
podYaml, podErr := s.podMgr.GetPodByName(ctx, podNS, podName, constant.UseCache)
131154

132155
// handle the pod not existed with the same name
@@ -399,11 +422,82 @@ func (s *SpiderGC) executeScanAll(ctx context.Context) {
399422
fnScanAll(v6poolList)
400423
}()
401424
}
402-
403425
wg.Wait()
426+
427+
// Clean up outdated endpoints where both pod and owner reference no longer exist
428+
if s.gcConfig.EnableCleanOutdatedEndpoint {
429+
if len(suspiciousEndpointMap) > 0 {
430+
logger.Sugar().Infof("Endpoint cleanup: processing %d outdated endpoints", len(suspiciousEndpointMap))
431+
s.cleanOutdateEndpoint(ctx, suspiciousEndpointMap)
432+
} else {
433+
logger.Sugar().Infof("Endpoint cleanup: no outdated endpoints found, nothing to clean")
434+
}
435+
} else {
436+
logger.Sugar().Infof("Endpoint cleanup: disabled by configuration (EnableCleanOutdatedEndpoint=false)")
437+
}
438+
404439
logger.Sugar().Debugf("IP GC scan all finished")
405440
}
406441

442+
func (s *SpiderGC) cleanOutdateEndpoint(ctx context.Context, suspiciousEndpointMap map[string]*spiderpoolv2beta1.WorkloadEndpointStatus) {
443+
logCtx := logutils.IntoContext(ctx, logger)
444+
445+
for nsNameKey, status := range suspiciousEndpointMap {
446+
namespace, name, err := cache.SplitMetaNamespaceKey(nsNameKey)
447+
if err != nil {
448+
logger.Sugar().Errorf("cleanOutdateEndpoint: failed to clean outdated endpoint %s/%s: %v", namespace, name, err)
449+
continue
450+
}
451+
for _, ipAllocationDetail := range status.Current.IPs {
452+
ipv4Pool := ipAllocationDetail.IPv4Pool
453+
ipv6Pool := ipAllocationDetail.IPv6Pool
454+
if ipv4Pool != nil {
455+
err := s.checkEndpointExistInIPPool(logCtx, namespace, name, *ipv4Pool, logger, status)
456+
if err != nil {
457+
logger.Sugar().Errorf("cleanOutdateEndpoint: failed to clean outdated endpoint %s/%s: %v", namespace, name, err)
458+
}
459+
}
460+
461+
if ipv6Pool != nil {
462+
err := s.checkEndpointExistInIPPool(logCtx, namespace, name, *ipv6Pool, logger, status)
463+
if err != nil {
464+
logger.Sugar().Errorf("cleanOutdateEndpoint: failed to clean outdated endpoint %s/%s: %v", namespace, name, err)
465+
}
466+
}
467+
}
468+
}
469+
logger.Sugar().Debugf("Finished cleaning outdated endpoints")
470+
}
471+
472+
func (s *SpiderGC) checkEndpointExistInIPPool(ctx context.Context, epNamespace, epName, poolName string, logger *zap.Logger, status *spiderpoolv2beta1.WorkloadEndpointStatus) error {
473+
pool, err := s.ippoolMgr.GetIPPoolByName(ctx, poolName, constant.IgnoreCache)
474+
if err != nil {
475+
return err
476+
}
477+
478+
poolAllocatedIPs, err := convert.UnmarshalIPPoolAllocatedIPs(pool.Status.AllocatedIPs)
479+
if err != nil {
480+
return err
481+
}
482+
483+
endpointKey := epNamespace + "/" + epName
484+
for _, poolIPAllocation := range poolAllocatedIPs {
485+
if poolIPAllocation.NamespacedName == endpointKey {
486+
logger.Sugar().Debugf("Endpoint validation: endpoint %s has active IP allocation in pool %s, skipping cleanup", endpointKey, poolName)
487+
return nil
488+
}
489+
}
490+
491+
logger.Sugar().Debugf("Endpoint cleanup: endpoint %s has no IP allocation in pool %s, proceeding with cleanup", endpointKey, poolName)
492+
err = s.wepMgr.ReleaseEndpointAndFinalizer(ctx, epNamespace, epName, constant.IgnoreCache)
493+
if err != nil {
494+
logger.Sugar().Errorf("Endpoint cleanup: failed to remove endpoint %s: %v", endpointKey, err)
495+
} else {
496+
logger.Sugar().Infof("Endpoint cleanup: successfully removed outdated endpoint %s", endpointKey)
497+
}
498+
return nil
499+
}
500+
407501
// Helps check if it is a valid static Pod (StatefulSet or Kubevirt), if it is a valid static Pod. Return true
408502
func (s *SpiderGC) isValidStatefulsetOrKubevirt(ctx context.Context, logger *zap.Logger, podNS, podName, poolIP, ownerControllerType string) (bool, error) {
409503
if s.gcConfig.EnableStatefulSet && ownerControllerType == constant.KindStatefulSet {

pkg/gcmanager/tracePod_worker.go

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,37 +49,36 @@ func (s *SpiderGC) handlePodEntryForTracingTimeOut(podEntry *PodEntry) {
4949
if podEntry.TracingStopTime.IsZero() {
5050
logger.Sugar().Warnf("unknown podEntry: %+v", podEntry)
5151
return
52-
} else {
53-
if time.Now().UTC().After(podEntry.TracingStopTime) {
54-
// If the statefulset application quickly experiences scaling down and up,
55-
// check whether `Status.PodIPs` is empty to determine whether the Pod in the current K8S has completed the normal IP release to avoid releasing the wrong IP.
56-
ctx := context.TODO()
57-
currentPodYaml, err := s.podMgr.GetPodByName(ctx, podEntry.Namespace, podEntry.PodName, constant.UseCache)
58-
if err != nil {
59-
tracingReason := fmt.Sprintf("the graceful deletion period of pod '%s/%s' is over, get the current pod status in Kubernetes", podEntry.Namespace, podEntry.PodName)
60-
if apierrors.IsNotFound(err) {
61-
logger.With(zap.Any("podEntry tracing-reason", tracingReason)).
62-
Sugar().Debugf("pod '%s/%s' not found", podEntry.Namespace, podEntry.PodName)
63-
} else {
64-
logger.With(zap.Any("podEntry tracing-reason", tracingReason)).
65-
Sugar().Errorf("failed to get pod '%s/%s', error: %v", podEntry.Namespace, podEntry.PodName, err)
66-
// the pod will be handled next time.
67-
return
68-
}
52+
}
53+
54+
if time.Now().UTC().After(podEntry.TracingStopTime) {
55+
// If the statefulset application quickly experiences scaling down and up,
56+
// check whether `Status.PodIPs` is empty to determine whether the Pod in the current K8S has completed the normal IP release to avoid releasing the wrong IP.
57+
ctx := context.TODO()
58+
currentPodYaml, err := s.podMgr.GetPodByName(ctx, podEntry.Namespace, podEntry.PodName, constant.UseCache)
59+
if err != nil {
60+
tracingReason := fmt.Sprintf("the graceful deletion period of pod '%s/%s' is over, get the current pod status in Kubernetes", podEntry.Namespace, podEntry.PodName)
61+
if apierrors.IsNotFound(err) {
62+
logger.With(zap.Any("podEntry tracing-reason", tracingReason)).
63+
Sugar().Debugf("pod '%s/%s' not found", podEntry.Namespace, podEntry.PodName)
6964
} else {
70-
if len(currentPodYaml.Status.PodIPs) == 0 {
71-
logger.Sugar().Infof("The IP address of the Pod %v that has exceeded the grace period has been released through cmdDel, ignore it.", podEntry.PodName)
72-
s.PodDB.DeletePodEntry(podEntry.Namespace, podEntry.PodName)
73-
return
74-
}
65+
logger.With(zap.Any("podEntry tracing-reason", tracingReason)).
66+
Sugar().Errorf("failed to get pod '%s/%s', error: %v", podEntry.Namespace, podEntry.PodName, err)
67+
// the pod will be handled next time.
68+
return
7569
}
76-
77-
logger.With(zap.Any("podEntry tracing-reason", podEntry.PodTracingReason)).
78-
Sugar().Infof("the graceful deletion period of pod '%s/%s' is over, try to release the IP address.", podEntry.Namespace, podEntry.PodName)
7970
} else {
80-
// not time out
81-
return
71+
if len(currentPodYaml.Status.PodIPs) == 0 {
72+
logger.Sugar().Infof("The IP address of the Pod %v that has exceeded the grace period has been released through cmdDel, to ensure the IP is released, trigger this pod entry to gc again.", podEntry.PodName)
73+
s.PodDB.DeletePodEntry(podEntry.Namespace, podEntry.PodName)
74+
}
8275
}
76+
77+
logger.With(zap.Any("podEntry tracing-reason", podEntry.PodTracingReason)).
78+
Sugar().Infof("the graceful deletion period of pod '%s/%s' is over, try to release the IP address.", podEntry.Namespace, podEntry.PodName)
79+
} else {
80+
// not time out
81+
return
8382
}
8483

8584
select {
@@ -100,7 +99,7 @@ func (s *SpiderGC) releaseIPPoolIPExecutor(ctx context.Context, workerIndex int)
10099
case podCache := <-s.gcIPPoolIPSignal:
101100
err := func() error {
102101
endpoint, err := s.wepMgr.GetEndpointByName(ctx, podCache.Namespace, podCache.PodName, constant.UseCache)
103-
if nil != err {
102+
if err != nil {
104103
if apierrors.IsNotFound(err) {
105104
log.Sugar().Infof("SpiderEndpoint '%s/%s' not found, maybe already cleaned by cmdDel or ScanAll",
106105
podCache.Namespace, podCache.PodName)

0 commit comments

Comments
 (0)