Skip to content

Commit 2d17ea9

Browse files
feat: Add CronJob custom processor
Add support for writing custom processors for the "CronJob" entity. Add custom processor to compute CronJob "active" facet
1 parent 4104eed commit 2d17ea9

File tree

6 files changed

+326
-1
lines changed

6 files changed

+326
-1
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
batch "k8s.io/api/batch/v1"
5+
)
6+
7+
const (
8+
CronJobActiveKey = "active"
9+
)
10+
11+
type CronJobActiveAction struct{}
12+
13+
func NewCronJobActiveAction() CronJobActiveAction {
14+
return CronJobActiveAction{}
15+
}
16+
17+
// ---------------------------------- CronJob "active" ----------------------------------
18+
// Generates the CronJob "active" facet.
19+
// This is essentially just the length of a slice. However, since the slice's
20+
// inner type is not of the accepted ValueTypes for OTTL's Len() function,
21+
// computing this requires a custom processor
22+
func (CronJobActiveAction) ComputeAttributes(cronJob batch.CronJob) (attributes, error) {
23+
return attributes{CronJobActiveKey: len(cronJob.Status.Active)}, nil
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package observek8sattributesprocessor
2+
3+
import "testing"
4+
5+
func TestCronJobActions(t *testing.T) {
6+
for _, testCase := range []k8sEventProcessorTest{
7+
{
8+
name: "Active CronJob",
9+
inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEvent.json"),
10+
expectedResults: []queryWithResult{
11+
{"observe_transform.facets.active", int64(1)},
12+
},
13+
},
14+
{
15+
name: "Idle CronJob",
16+
inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEventNotActive.json"),
17+
expectedResults: []queryWithResult{
18+
{"observe_transform.facets.active", int64(0)},
19+
},
20+
},
21+
} {
22+
runTest(t, testCase)
23+
}
24+
}

components/processors/observek8sattributesprocessor/processor.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
EventKindPod = "Pod"
2020
EventKindNode = "Node"
2121
EventKindJob = "Job"
22+
EventKindCronJob = "CronJob"
2223
EventKindDaemonSet = "DaemonSet"
2324
)
2425

@@ -28,6 +29,7 @@ type K8sEventsProcessor struct {
2829
nodeActions []nodeAction
2930
podActions []podAction
3031
jobActions []jobAction
32+
cronJobActions []cronJobAction
3133
daemonSetActions []daemonSetAction
3234
}
3335

@@ -47,6 +49,9 @@ func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsP
4749
daemonSetActions: []daemonSetAction{
4850
NewDaemonsetSelectorAction(),
4951
},
52+
cronJobActions: []cronJobAction{
53+
NewCronJobActiveAction(),
54+
},
5055
}
5156
}
5257

@@ -95,11 +100,19 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object {
95100
return nil
96101
}
97102
return &job
103+
case EventKindCronJob:
104+
var cronJob batchv1.CronJob
105+
err := json.Unmarshal([]byte(lr.Body().AsString()), &cronJob)
106+
if err != nil {
107+
kep.logger.Error("failed to unmarshal CronJob event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
108+
return nil
109+
}
110+
return &cronJob
98111
case EventKindDaemonSet:
99112
var daemonSet appsv1.DaemonSet
100113
err := json.Unmarshal([]byte(lr.Body().AsString()), &daemonSet)
101114
if err != nil {
102-
kep.logger.Error("failed to unmarshal daemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
115+
kep.logger.Error("failed to unmarshal DaemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
103116
return nil
104117
}
105118
return &daemonSet
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
{
2+
"apiVersion": "batch/v1",
3+
"kind": "CronJob",
4+
"metadata": {
5+
"creationTimestamp": "2024-08-30T11:51:53Z",
6+
"generation": 1,
7+
"managedFields": [
8+
{
9+
"apiVersion": "batch/v1",
10+
"fieldsType": "FieldsV1",
11+
"fieldsV1": {
12+
"f:spec": {
13+
"f:concurrencyPolicy": {},
14+
"f:failedJobsHistoryLimit": {},
15+
"f:jobTemplate": {
16+
"f:metadata": {
17+
"f:name": {}
18+
},
19+
"f:spec": {
20+
"f:template": {
21+
"f:spec": {
22+
"f:containers": {
23+
"k:{\"name\":\"my-cronjob\"}": {
24+
".": {},
25+
"f:command": {},
26+
"f:image": {},
27+
"f:imagePullPolicy": {},
28+
"f:name": {},
29+
"f:resources": {},
30+
"f:terminationMessagePath": {},
31+
"f:terminationMessagePolicy": {}
32+
}
33+
},
34+
"f:dnsPolicy": {},
35+
"f:restartPolicy": {},
36+
"f:schedulerName": {},
37+
"f:securityContext": {},
38+
"f:terminationGracePeriodSeconds": {}
39+
}
40+
}
41+
}
42+
},
43+
"f:schedule": {},
44+
"f:successfulJobsHistoryLimit": {},
45+
"f:suspend": {}
46+
}
47+
},
48+
"manager": "kubectl-create",
49+
"operation": "Update",
50+
"time": "2024-08-30T11:51:53Z"
51+
},
52+
{
53+
"apiVersion": "batch/v1",
54+
"fieldsType": "FieldsV1",
55+
"fieldsV1": {
56+
"f:status": {
57+
"f:active": {},
58+
"f:lastScheduleTime": {},
59+
"f:lastSuccessfulTime": {}
60+
}
61+
},
62+
"manager": "kube-controller-manager",
63+
"operation": "Update",
64+
"subresource": "status",
65+
"time": "2024-08-30T12:03:00Z"
66+
}
67+
],
68+
"name": "my-cronjob",
69+
"namespace": "k8sexplorer",
70+
"resourceVersion": "429516",
71+
"uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3"
72+
},
73+
"spec": {
74+
"concurrencyPolicy": "Allow",
75+
"failedJobsHistoryLimit": 1,
76+
"jobTemplate": {
77+
"metadata": {
78+
"creationTimestamp": null,
79+
"name": "my-cronjob"
80+
},
81+
"spec": {
82+
"template": {
83+
"metadata": {
84+
"creationTimestamp": null
85+
},
86+
"spec": {
87+
"containers": [
88+
{
89+
"command": [
90+
"/bin/sh",
91+
"-c",
92+
"echo 'Hello, World!'"
93+
],
94+
"image": "busybox",
95+
"imagePullPolicy": "Always",
96+
"name": "my-cronjob",
97+
"resources": {},
98+
"terminationMessagePath": "/dev/termination-log",
99+
"terminationMessagePolicy": "File"
100+
}
101+
],
102+
"dnsPolicy": "ClusterFirst",
103+
"restartPolicy": "OnFailure",
104+
"schedulerName": "default-scheduler",
105+
"securityContext": {},
106+
"terminationGracePeriodSeconds": 30
107+
}
108+
}
109+
}
110+
},
111+
"schedule": "*/1 * * * *",
112+
"successfulJobsHistoryLimit": 3,
113+
"suspend": false
114+
},
115+
"status": {
116+
"active": [
117+
{
118+
"apiVersion": "batch/v1",
119+
"kind": "Job",
120+
"name": "my-cronjob-28750323",
121+
"namespace": "k8sexplorer",
122+
"resourceVersion": "429515",
123+
"uid": "037bca5b-81dd-4a4c-8f6b-340db5b38c0d"
124+
}
125+
],
126+
"lastScheduleTime": "2024-08-30T12:03:00Z",
127+
"lastSuccessfulTime": "2024-08-30T12:02:04Z"
128+
}
129+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
{
2+
"apiVersion": "batch/v1",
3+
"kind": "CronJob",
4+
"metadata": {
5+
"creationTimestamp": "2024-08-30T11:51:53Z",
6+
"generation": 1,
7+
"managedFields": [
8+
{
9+
"apiVersion": "batch/v1",
10+
"fieldsType": "FieldsV1",
11+
"fieldsV1": {
12+
"f:spec": {
13+
"f:concurrencyPolicy": {},
14+
"f:failedJobsHistoryLimit": {},
15+
"f:jobTemplate": {
16+
"f:metadata": {
17+
"f:name": {}
18+
},
19+
"f:spec": {
20+
"f:template": {
21+
"f:spec": {
22+
"f:containers": {
23+
"k:{\"name\":\"my-cronjob\"}": {
24+
".": {},
25+
"f:command": {},
26+
"f:image": {},
27+
"f:imagePullPolicy": {},
28+
"f:name": {},
29+
"f:resources": {},
30+
"f:terminationMessagePath": {},
31+
"f:terminationMessagePolicy": {}
32+
}
33+
},
34+
"f:dnsPolicy": {},
35+
"f:restartPolicy": {},
36+
"f:schedulerName": {},
37+
"f:securityContext": {},
38+
"f:terminationGracePeriodSeconds": {}
39+
}
40+
}
41+
}
42+
},
43+
"f:schedule": {},
44+
"f:successfulJobsHistoryLimit": {},
45+
"f:suspend": {}
46+
}
47+
},
48+
"manager": "kubectl-create",
49+
"operation": "Update",
50+
"time": "2024-08-30T11:51:53Z"
51+
},
52+
{
53+
"apiVersion": "batch/v1",
54+
"fieldsType": "FieldsV1",
55+
"fieldsV1": {
56+
"f:status": {
57+
"f:lastScheduleTime": {},
58+
"f:lastSuccessfulTime": {}
59+
}
60+
},
61+
"manager": "kube-controller-manager",
62+
"operation": "Update",
63+
"subresource": "status",
64+
"time": "2024-08-30T12:03:03Z"
65+
}
66+
],
67+
"name": "my-cronjob",
68+
"namespace": "k8sexplorer",
69+
"resourceVersion": "429546",
70+
"uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3"
71+
},
72+
"spec": {
73+
"concurrencyPolicy": "Allow",
74+
"failedJobsHistoryLimit": 1,
75+
"jobTemplate": {
76+
"metadata": {
77+
"creationTimestamp": null,
78+
"name": "my-cronjob"
79+
},
80+
"spec": {
81+
"template": {
82+
"metadata": {
83+
"creationTimestamp": null
84+
},
85+
"spec": {
86+
"containers": [
87+
{
88+
"command": [
89+
"/bin/sh",
90+
"-c",
91+
"echo 'Hello, World!'"
92+
],
93+
"image": "busybox",
94+
"imagePullPolicy": "Always",
95+
"name": "my-cronjob",
96+
"resources": {},
97+
"terminationMessagePath": "/dev/termination-log",
98+
"terminationMessagePolicy": "File"
99+
}
100+
],
101+
"dnsPolicy": "ClusterFirst",
102+
"restartPolicy": "OnFailure",
103+
"schedulerName": "default-scheduler",
104+
"securityContext": {},
105+
"terminationGracePeriodSeconds": 30
106+
}
107+
}
108+
}
109+
},
110+
"schedule": "*/1 * * * *",
111+
"successfulJobsHistoryLimit": 3,
112+
"suspend": false
113+
},
114+
"status": {
115+
"lastScheduleTime": "2024-08-30T12:03:00Z",
116+
"lastSuccessfulTime": "2024-08-30T12:03:03Z"
117+
}
118+
}

components/processors/observek8sattributesprocessor/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type nodeAction interface {
3939
type jobAction interface {
4040
ComputeAttributes(batchv1.Job) (attributes, error)
4141
}
42+
type cronJobAction interface {
43+
ComputeAttributes(batchv1.CronJob) (attributes, error)
44+
}
4245
type daemonSetAction interface {
4346
ComputeAttributes(appsv1.DaemonSet) (attributes, error)
4447
}
@@ -51,6 +54,8 @@ func (proc *K8sEventsProcessor) RunActions(obj metav1.Object) (attributes, error
5154
return proc.runNodeActions(*typed)
5255
case *batchv1.Job:
5356
return proc.runJobActions(*typed)
57+
case *batchv1.CronJob:
58+
return proc.runCronJobActions(*typed)
5459
case *appsv1.DaemonSet:
5560
return proc.runDaemonSetActions(*typed)
5661
}
@@ -97,6 +102,18 @@ func (m *K8sEventsProcessor) runJobActions(job batchv1.Job) (attributes, error)
97102
return res, nil
98103
}
99104

105+
func (m *K8sEventsProcessor) runCronJobActions(cronJob batchv1.CronJob) (attributes, error) {
106+
res := attributes{}
107+
for _, action := range m.cronJobActions {
108+
atts, err := action.ComputeAttributes(cronJob)
109+
if err != nil {
110+
return res, err
111+
}
112+
res.addAttributes(atts)
113+
}
114+
return res, nil
115+
}
116+
100117
func (m *K8sEventsProcessor) runDaemonSetActions(daemonset appsv1.DaemonSet) (attributes, error) {
101118
res := attributes{}
102119
for _, action := range m.daemonSetActions {

0 commit comments

Comments
 (0)