Skip to content

Commit 0c76ee5

Browse files
valerian-rochek8s-publishing-bot
authored andcommitted
Add unit tests for Data Consistency Detector
Kubernetes-commit: 86c4e09a78828504aba09218ff54c222685459d4
1 parent cc3d9d0 commit 0c76ee5

File tree

4 files changed

+138
-20
lines changed

4 files changed

+138
-20
lines changed

tools/cache/controller.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -596,16 +596,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
596596
// KeyLister, that way resync operations will result in the correct set
597597
// of update/delete deltas.
598598

599-
var fifo Queue
600-
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
601-
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
602-
} else {
603-
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
604-
KnownObjects: clientState,
605-
EmitDeltaTypeReplaced: true,
606-
Transformer: options.Transform,
607-
})
608-
}
599+
fifo := newQueueFIFO(clientState, options.Transform)
609600

610601
cfg := &Config{
611602
Queue: fifo,
@@ -623,3 +614,15 @@ func newInformer(clientState Store, options InformerOptions) Controller {
623614
}
624615
return New(cfg)
625616
}
617+
618+
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
619+
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
620+
return NewRealFIFO(MetaNamespaceKeyFunc, clientState, transform)
621+
} else {
622+
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
623+
KnownObjects: clientState,
624+
EmitDeltaTypeReplaced: true,
625+
Transformer: transform,
626+
})
627+
}
628+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
v1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/apimachinery/pkg/watch"
30+
clientfeatures "k8s.io/client-go/features"
31+
clientfeaturestesting "k8s.io/client-go/features/testing"
32+
"k8s.io/client-go/util/consistencydetector"
33+
"k8s.io/klog/v2/ktesting"
34+
)
35+
36+
func TestReflectorDataConsistencyDetector(t *testing.T) {
37+
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
38+
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
39+
defer restore()
40+
41+
markTransformed := func(obj interface{}) (interface{}, error) {
42+
pod, ok := obj.(*v1.Pod)
43+
if !ok {
44+
return obj, nil
45+
}
46+
newPod := pod.DeepCopy()
47+
if newPod.Labels == nil {
48+
newPod.Labels = make(map[string]string)
49+
}
50+
newPod.Labels["transformed"] = "true"
51+
return newPod, nil
52+
}
53+
54+
for _, inOrder := range []bool{false, true} {
55+
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
56+
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
57+
for _, transformerEnabled := range []bool{false, true} {
58+
var transformer TransformFunc
59+
if transformerEnabled {
60+
transformer = markTransformed
61+
}
62+
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
63+
runTestReflectorDataConsistencyDetector(t, transformer)
64+
})
65+
}
66+
})
67+
}
68+
}
69+
70+
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
71+
_, ctx := ktesting.NewTestContext(t)
72+
ctx, cancel := context.WithCancel(ctx)
73+
defer cancel()
74+
75+
store := NewStore(MetaNamespaceKeyFunc)
76+
fifo := newQueueFIFO(store, transformer)
77+
78+
lw := &ListWatch{
79+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
80+
return &v1.PodList{
81+
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
82+
Items: []v1.Pod{
83+
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
84+
},
85+
}, nil
86+
},
87+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
88+
w := watch.NewFake()
89+
go func() {
90+
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
91+
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
92+
Name: "pod-1",
93+
ResourceVersion: "1",
94+
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
95+
}})
96+
}()
97+
return w, nil
98+
},
99+
}
100+
101+
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
102+
103+
go func() {
104+
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
105+
return r.LastSyncResourceVersion() != "", nil
106+
})
107+
cancel()
108+
}()
109+
110+
err := r.ListAndWatchWithContext(ctx)
111+
if err != nil {
112+
t.Errorf("ListAndWatchWithContext returned error: %v", err)
113+
}
114+
}

tools/cache/shared_informer.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -539,16 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
539539
s.startedLock.Lock()
540540
defer s.startedLock.Unlock()
541541

542-
var fifo Queue
543-
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
544-
fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform)
545-
} else {
546-
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
547-
KnownObjects: s.indexer,
548-
EmitDeltaTypeReplaced: true,
549-
Transformer: s.transform,
550-
})
551-
}
542+
fifo := newQueueFIFO(s.indexer, s.transform)
552543

553544
cfg := &Config{
554545
Queue: fifo,

util/consistencydetector/data_consistency_detector.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
4545
return dataConsistencyDetectionForWatchListEnabled
4646
}
4747

48+
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
49+
// It returns a function that restores the original value.
50+
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
51+
original := dataConsistencyDetectionForWatchListEnabled
52+
dataConsistencyDetectionForWatchListEnabled = enabled
53+
return func() {
54+
dataConsistencyDetectionForWatchListEnabled = original
55+
}
56+
}
57+
4858
type RetrieveItemsFunc[U any] func() []U
4959

5060
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)

0 commit comments

Comments
 (0)