Skip to content

Commit 27a5c14

Browse files
TEP-0135: Refactor Affinity Assistant PVC creation
Part of [#6740][#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure that all of a PipelineRun's pods are scheduled to the same node. Before this commit, the PipelineRun reconciler creates PVC for each `VolumeClaimTemplate` backed workspace, and mount the PVCs to the AA to avoid PV availability zone conflict. This implementation works for `AffinityAssistantPerWorkspace` but introduces availability zone conflict issue in the `AffinityAssistantPerPipelineRun` mode since we cannot enforce all the PVC are created in the same availability zone. Instead of directly creating a PVC for each PipelineRun workspace backed by a VolumeClaimTemplate, this commit sets one VolumeClaimTemplate per PVC workspace in the affinity assistant StatefulSet spec, which enforces all VolumeClaimTemplates in StatefulSets are all provisioned on the same node/availability zone. This commit just refactors the current implementation in favor of the `AffinityAssistantPerPipelineRun` feature. There is no functionality change. The `AffinityAssistantPerPipelineRun` feature will be added in the follow up PRs. [#6740]: #6740 [tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
1 parent 264476b commit 27a5c14

File tree

4 files changed

+288
-83
lines changed

4 files changed

+288
-83
lines changed

pkg/reconciler/pipelinerun/affinity_assistant.go

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,26 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
5656
var errs []error
5757
var unschedulableNodes sets.Set[string] = nil
5858
for _, w := range wb {
59+
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
60+
continue
61+
}
62+
63+
var claimTemplates []corev1.PersistentVolumeClaim
64+
var claims []corev1.PersistentVolumeClaimVolumeSource
65+
if w.PersistentVolumeClaim != nil {
66+
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
67+
} else if w.VolumeClaimTemplate != nil {
68+
claimTemplate := getVolumeClaimTemplate(w, pr)
69+
claimTemplates = append(claimTemplates, *claimTemplate)
70+
}
71+
5972
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
6073
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
6174
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
62-
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
6375
switch {
6476
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
6577
case apierrors.IsNotFound(err):
66-
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
78+
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
6779
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
6880
if err != nil {
6981
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
@@ -113,14 +125,10 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
113125
return errorutils.NewAggregate(errs)
114126
}
115127

116-
func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string {
117-
if w.PersistentVolumeClaim != nil {
118-
return w.PersistentVolumeClaim.ClaimName
119-
} else if w.VolumeClaimTemplate != nil {
120-
return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference)
121-
}
122-
123-
return ""
128+
func getVolumeClaimTemplate(wb v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun) *corev1.PersistentVolumeClaim {
129+
claimTemplate := wb.VolumeClaimTemplate.DeepCopy()
130+
claimTemplate.Name = volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, *kmeta.NewControllerRef(pr))
131+
return claimTemplate
124132
}
125133

126134
func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
@@ -162,7 +170,7 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string)
162170
return labels
163171
}
164172

165-
func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
173+
func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
166174
// We want a singleton pod
167175
replicas := int32(1)
168176

@@ -172,6 +180,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
172180
tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl)
173181
}
174182

183+
var mounts []corev1.VolumeMount
184+
for _, claimTemplate := range claimTemplates {
185+
mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name})
186+
}
187+
175188
containers := []corev1.Container{{
176189
Name: "affinity-assistant",
177190
Image: affinityAssistantImage,
@@ -190,8 +203,32 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
190203
"memory": resource.MustParse("100Mi"),
191204
},
192205
},
206+
VolumeMounts: mounts,
193207
}}
194208

209+
// TODO(QuanZhang-William): when introducing `coscheduling` flag with values `coschedule-pipelineruns` or `isolate-pipelineruns
210+
// (https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#configuration),
211+
// it may require a StorageClass with VolumeBindingMode: WaitForFirstConsumer when WS specifies PersistentVolumeClaims to avoid Availability Zone conflict
212+
// (discussed in https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#co-locating-volumes).
213+
// We need to update the TEP and documentation for this limitation if it is the case.
214+
var volumes []corev1.Volume
215+
for i, claim := range claims {
216+
volumes = append(volumes, corev1.Volume{
217+
Name: fmt.Sprintf("workspace-%d", i),
218+
VolumeSource: corev1.VolumeSource{
219+
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
220+
// volumeBindingMode: Immediate
221+
// the PV is allocated on a Node first, and then the pod need to be
222+
// scheduled to that node.
223+
// To support those PVCs, the Affinity Assistant must also mount the
224+
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
225+
// pod is scheduled to the same Availability Zone as the PV, when using
226+
// a regional cluster. This is called VolumeScheduling.
227+
PersistentVolumeClaim: claim.DeepCopy(),
228+
},
229+
})
230+
}
231+
195232
return &appsv1.StatefulSet{
196233
TypeMeta: metav1.TypeMeta{
197234
Kind: "StatefulSet",
@@ -207,6 +244,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
207244
Selector: &metav1.LabelSelector{
208245
MatchLabels: getStatefulSetLabels(pr, name),
209246
},
247+
// by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet
248+
VolumeClaimTemplates: claimTemplates,
210249
Template: corev1.PodTemplateSpec{
211250
ObjectMeta: metav1.ObjectMeta{
212251
Labels: getStatefulSetLabels(pr, name),
@@ -219,21 +258,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
219258
ImagePullSecrets: tpl.ImagePullSecrets,
220259

221260
Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
222-
Volumes: []corev1.Volume{{
223-
Name: "workspace",
224-
VolumeSource: corev1.VolumeSource{
225-
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
226-
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
227-
// volumeBindingMode: Immediate
228-
// the PV is allocated on a Node first, and then the pod need to be
229-
// scheduled to that node.
230-
// To support those PVCs, the Affinity Assistant must also mount the
231-
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
232-
// pod is scheduled to the same Availability Zone as the PV, when using
233-
// a regional cluster. This is called VolumeScheduling.
234-
ClaimName: claimName,
235-
}},
236-
}},
261+
Volumes: volumes,
237262
},
238263
},
239264
},

pkg/reconciler/pipelinerun/affinity_assistant_test.go

Lines changed: 101 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,108 @@ var testPipelineRun = &v1beta1.PipelineRun{
6060
}
6161

6262
// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant
63-
// for a given PipelineRun with a PVC workspace
63+
// for a given PipelineRun
6464
func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) {
65-
ctx := context.Background()
66-
ctx, cancel := context.WithCancel(ctx)
67-
defer cancel()
68-
69-
c := Reconciler{
70-
KubeClientSet: fakek8s.NewSimpleClientset(),
71-
Images: pipeline.Images{},
65+
tests := []struct {
66+
name string
67+
pr *v1beta1.PipelineRun
68+
}{{
69+
name: "PersistentVolumeClaim Workspace type",
70+
pr: &v1beta1.PipelineRun{
71+
Spec: v1beta1.PipelineRunSpec{
72+
Workspaces: []v1beta1.WorkspaceBinding{{
73+
Name: "PersistentVolumeClaim Workspace",
74+
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
75+
ClaimName: "myclaim",
76+
},
77+
}},
78+
},
79+
},
80+
}, {
81+
name: "VolumeClaimTemplate Workspace type",
82+
pr: &v1beta1.PipelineRun{
83+
Spec: v1beta1.PipelineRunSpec{
84+
Workspaces: []v1beta1.WorkspaceBinding{
85+
{
86+
Name: "VolumeClaimTemplate Workspace",
87+
VolumeClaimTemplate: &corev1.PersistentVolumeClaim{},
88+
}},
89+
},
90+
},
91+
},
92+
{
93+
name: "other Workspace type",
94+
pr: &v1beta1.PipelineRun{
95+
Spec: v1beta1.PipelineRunSpec{
96+
Workspaces: []v1beta1.WorkspaceBinding{
97+
{
98+
Name: "EmptyDir Workspace",
99+
EmptyDir: &corev1.EmptyDirVolumeSource{},
100+
}},
101+
},
102+
},
103+
},
72104
}
73105

74-
err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
75-
if err != nil {
76-
t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err)
77-
}
106+
for _, tc := range tests {
107+
tc := tc
108+
t.Run(tc.name, func(t *testing.T) {
109+
ctx := context.Background()
110+
ctx, cancel := context.WithCancel(ctx)
111+
defer cancel()
78112

79-
expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name)
80-
_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{})
81-
if err != nil {
82-
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
83-
}
113+
c := Reconciler{
114+
KubeClientSet: fakek8s.NewSimpleClientset(),
115+
Images: pipeline.Images{},
116+
}
84117

85-
err = c.cleanupAffinityAssistants(ctx, testPipelineRun)
86-
if err != nil {
87-
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
88-
}
118+
err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace)
119+
if err != nil {
120+
t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err)
121+
}
122+
123+
expectedAAName := getAffinityAssistantName(tc.pr.Spec.Workspaces[0].Name, tc.pr.Name)
124+
aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{})
125+
if err != nil {
126+
if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim == nil && tc.pr.Spec.Workspaces[0].VolumeClaimTemplate == nil {
127+
if !apierrors.IsNotFound(err) {
128+
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
129+
}
130+
} else {
131+
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
132+
}
133+
}
134+
135+
// validate PVs from Affinity Assistant
136+
if tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil {
137+
if len(aa.Spec.VolumeClaimTemplates) != 1 {
138+
t.Errorf("unexpected VolumeClaimTemplate count, expect 1 but got %v", len(aa.Spec.VolumeClaimTemplates))
139+
}
140+
if d := cmp.Diff(*getVolumeClaimTemplate(tc.pr.Spec.Workspaces[0], tc.pr), aa.Spec.VolumeClaimTemplates[0]); d != "" {
141+
t.Errorf("VolumeClaimTemplates diff: %s", diff.PrintWantGot(d))
142+
}
143+
} else if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil {
144+
if len(aa.Spec.Template.Spec.Volumes) != 1 {
145+
t.Errorf("unexpected PersistentVolumeClaim count, expect 1 but got %v", len(aa.Spec.Template.Spec.Volumes))
146+
}
147+
if d := cmp.Diff(tc.pr.Spec.Workspaces[0].PersistentVolumeClaim, aa.Spec.Template.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim); d != "" {
148+
t.Errorf("PersistentVolumeClaim diff: %s", diff.PrintWantGot(d))
149+
}
150+
}
151+
152+
// clean up Affinity Assistant
153+
if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil || tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil {
154+
err = c.cleanupAffinityAssistants(ctx, tc.pr)
155+
if err != nil {
156+
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
157+
}
89158

90-
_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{})
91-
if !apierrors.IsNotFound(err) {
92-
t.Errorf("expected a NotFound response, got: %v", err)
159+
_, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{})
160+
if !apierrors.IsNotFound(err) {
161+
t.Errorf("expected a NotFound response, got: %v", err)
162+
}
163+
}
164+
})
93165
}
94166
}
95167

@@ -239,7 +311,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
239311
},
240312
}
241313

242-
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil)
314+
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
243315

244316
if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
245317
t.Errorf("expected Tolerations in the StatefulSet")
@@ -277,7 +349,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
277349
}},
278350
}
279351

280-
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl)
352+
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
281353

282354
if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
283355
t.Errorf("expected Tolerations in the StatefulSet")
@@ -323,7 +395,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
323395
}},
324396
}
325397

326-
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl)
398+
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
327399

328400
if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
329401
t.Errorf("expected Tolerations from spec in the StatefulSet")
@@ -360,7 +432,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
360432
},
361433
}
362434

363-
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil)
435+
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
364436

365437
if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
366438
t.Errorf("expected Tolerations from spec in the StatefulSet")
@@ -380,7 +452,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
380452
Spec: v1beta1.PipelineRunSpec{},
381453
}
382454

383-
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, "mypvc", "nginx", nil)
455+
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
384456

385457
if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
386458
t.Errorf("unexpected Tolerations in the StatefulSet")

0 commit comments

Comments
 (0)