diff --git a/components/processors/observek8sattributesprocessor/cronjobactions.go b/components/processors/observek8sattributesprocessor/cronjobactions.go new file mode 100644 index 000000000..469bb00ba --- /dev/null +++ b/components/processors/observek8sattributesprocessor/cronjobactions.go @@ -0,0 +1,25 @@ +package observek8sattributesprocessor + +import ( + batch "k8s.io/api/batch/v1" +) + +const ( + CronJobActiveKey = "active" +) + +type CronJobActiveAction struct{} + +func NewCronJobActiveAction() CronJobActiveAction { + return CronJobActiveAction{} +} + +// ---------------------------------- CronJob "active" ---------------------------------- + +// Generates the CronJob "active" facet. +// This is essentially just the length of a slice. However, since the slice's +// inner type is not of the accepted ValueTypes for OTTL's Len() function, +// computing this requires a custom processor +func (CronJobActiveAction) ComputeAttributes(cronJob batch.CronJob) (attributes, error) { + return attributes{CronJobActiveKey: len(cronJob.Status.Active)}, nil +} diff --git a/components/processors/observek8sattributesprocessor/cronjobactions_test.go b/components/processors/observek8sattributesprocessor/cronjobactions_test.go new file mode 100644 index 000000000..e8065e144 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/cronjobactions_test.go @@ -0,0 +1,24 @@ +package observek8sattributesprocessor + +import "testing" + +func TestCronJobActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Active CronJob", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.active", int64(1)}, + }, + }, + { + name: "Idle CronJob", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEventNotActive.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.active", int64(0)}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/processor.go b/components/processors/observek8sattributesprocessor/processor.go index 81c4528d8..34b0f2ae5 100644 --- a/components/processors/observek8sattributesprocessor/processor.go +++ b/components/processors/observek8sattributesprocessor/processor.go @@ -19,6 +19,7 @@ const ( EventKindPod = "Pod" EventKindNode = "Node" EventKindJob = "Job" + EventKindCronJob = "CronJob" EventKindDaemonSet = "DaemonSet" ) @@ -28,6 +29,7 @@ type K8sEventsProcessor struct { nodeActions []nodeAction podActions []podAction jobActions []jobAction + cronJobActions []cronJobAction daemonSetActions []daemonSetAction } @@ -47,6 +49,9 @@ func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsP daemonSetActions: []daemonSetAction{ NewDaemonsetSelectorAction(), }, + cronJobActions: []cronJobAction{ + NewCronJobActiveAction(), + }, } } @@ -95,11 +100,19 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object { return nil } return &job + case EventKindCronJob: + var cronJob batchv1.CronJob + err := json.Unmarshal([]byte(lr.Body().AsString()), &cronJob) + if err != nil { + kep.logger.Error("failed to unmarshal CronJob event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &cronJob case EventKindDaemonSet: var daemonSet appsv1.DaemonSet err := json.Unmarshal([]byte(lr.Body().AsString()), &daemonSet) if err != nil { - kep.logger.Error("failed to unmarshal daemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + kep.logger.Error("failed to unmarshal DaemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) return nil } return &daemonSet diff --git a/components/processors/observek8sattributesprocessor/testdata/cronJobEvent.json b/components/processors/observek8sattributesprocessor/testdata/cronJobEvent.json new file mode 100644 index 000000000..1095485d2 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/cronJobEvent.json @@ -0,0 +1,129 @@ +{ + "apiVersion": "batch/v1", + "kind": "CronJob", + "metadata": { + "creationTimestamp": "2024-08-30T11:51:53Z", + "generation": 1, + "managedFields": [ + { + "apiVersion": "batch/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:spec": { + "f:concurrencyPolicy": {}, + "f:failedJobsHistoryLimit": {}, + "f:jobTemplate": { + "f:metadata": { + "f:name": {} + }, + "f:spec": { + "f:template": { + "f:spec": { + "f:containers": { + "k:{\"name\":\"my-cronjob\"}": { + ".": {}, + "f:command": {}, + "f:image": {}, + "f:imagePullPolicy": {}, + "f:name": {}, + "f:resources": {}, + "f:terminationMessagePath": {}, + "f:terminationMessagePolicy": {} + } + }, + "f:dnsPolicy": {}, + "f:restartPolicy": {}, + "f:schedulerName": {}, + "f:securityContext": {}, + "f:terminationGracePeriodSeconds": {} + } + } + } + }, + "f:schedule": {}, + "f:successfulJobsHistoryLimit": {}, + "f:suspend": {} + } + }, + "manager": "kubectl-create", + "operation": "Update", + "time": "2024-08-30T11:51:53Z" + }, + { + "apiVersion": "batch/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:active": {}, + "f:lastScheduleTime": {}, + "f:lastSuccessfulTime": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-08-30T12:03:00Z" + } + ], + "name": "my-cronjob", + "namespace": "k8sexplorer", + "resourceVersion": "429516", + "uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3" + }, + "spec": { + "concurrencyPolicy": "Allow", + "failedJobsHistoryLimit": 1, + "jobTemplate": { + "metadata": { + "creationTimestamp": null, + "name": "my-cronjob" + }, + "spec": { + "template": { + "metadata": { + "creationTimestamp": null + }, + "spec": { + "containers": [ + { + "command": [ + "/bin/sh", + "-c", + "echo 'Hello, World!'" + ], + "image": "busybox", + "imagePullPolicy": "Always", + "name": "my-cronjob", + "resources": {}, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File" + } + ], + "dnsPolicy": "ClusterFirst", + "restartPolicy": "OnFailure", + "schedulerName": "default-scheduler", + "securityContext": {}, + "terminationGracePeriodSeconds": 30 + } + } + } + }, + "schedule": "*/1 * * * *", + "successfulJobsHistoryLimit": 3, + "suspend": false + }, + "status": { + "active": [ + { + "apiVersion": "batch/v1", + "kind": "Job", + "name": "my-cronjob-28750323", + "namespace": "k8sexplorer", + "resourceVersion": "429515", + "uid": "037bca5b-81dd-4a4c-8f6b-340db5b38c0d" + } + ], + "lastScheduleTime": "2024-08-30T12:03:00Z", + "lastSuccessfulTime": "2024-08-30T12:02:04Z" + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/cronJobEventNotActive.json b/components/processors/observek8sattributesprocessor/testdata/cronJobEventNotActive.json new file mode 100644 index 000000000..94f54c666 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/cronJobEventNotActive.json @@ -0,0 +1,118 @@ +{ + "apiVersion": "batch/v1", + "kind": "CronJob", + "metadata": { + "creationTimestamp": "2024-08-30T11:51:53Z", + "generation": 1, + "managedFields": [ + { + "apiVersion": "batch/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:spec": { + "f:concurrencyPolicy": {}, + "f:failedJobsHistoryLimit": {}, + "f:jobTemplate": { + "f:metadata": { + "f:name": {} + }, + "f:spec": { + "f:template": { + "f:spec": { + "f:containers": { + "k:{\"name\":\"my-cronjob\"}": { + ".": {}, + "f:command": {}, + "f:image": {}, + "f:imagePullPolicy": {}, + "f:name": {}, + "f:resources": {}, + "f:terminationMessagePath": {}, + "f:terminationMessagePolicy": {} + } + }, + "f:dnsPolicy": {}, + "f:restartPolicy": {}, + "f:schedulerName": {}, + "f:securityContext": {}, + "f:terminationGracePeriodSeconds": {} + } + } + } + }, + "f:schedule": {}, + "f:successfulJobsHistoryLimit": {}, + "f:suspend": {} + } + }, + "manager": "kubectl-create", + "operation": "Update", + "time": "2024-08-30T11:51:53Z" + }, + { + "apiVersion": "batch/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:lastScheduleTime": {}, + "f:lastSuccessfulTime": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-08-30T12:03:03Z" + } + ], + "name": "my-cronjob", + "namespace": "k8sexplorer", + "resourceVersion": "429546", + "uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3" + }, + "spec": { + "concurrencyPolicy": "Allow", + "failedJobsHistoryLimit": 1, + "jobTemplate": { + "metadata": { + "creationTimestamp": null, + "name": "my-cronjob" + }, + "spec": { + "template": { + "metadata": { + "creationTimestamp": null + }, + "spec": { + "containers": [ + { + "command": [ + "/bin/sh", + "-c", + "echo 'Hello, World!'" + ], + "image": "busybox", + "imagePullPolicy": "Always", + "name": "my-cronjob", + "resources": {}, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File" + } + ], + "dnsPolicy": "ClusterFirst", + "restartPolicy": "OnFailure", + "schedulerName": "default-scheduler", + "securityContext": {}, + "terminationGracePeriodSeconds": 30 + } + } + } + }, + "schedule": "*/1 * * * *", + "successfulJobsHistoryLimit": 3, + "suspend": false + }, + "status": { + "lastScheduleTime": "2024-08-30T12:03:00Z", + "lastSuccessfulTime": "2024-08-30T12:03:03Z" + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/types.go b/components/processors/observek8sattributesprocessor/types.go index 98d080074..38bfa10c3 100644 --- a/components/processors/observek8sattributesprocessor/types.go +++ b/components/processors/observek8sattributesprocessor/types.go @@ -39,6 +39,9 @@ type nodeAction interface { type jobAction interface { ComputeAttributes(batchv1.Job) (attributes, error) } +type cronJobAction interface { + ComputeAttributes(batchv1.CronJob) (attributes, error) +} type daemonSetAction interface { ComputeAttributes(appsv1.DaemonSet) (attributes, error) } @@ -51,6 +54,8 @@ func (proc *K8sEventsProcessor) RunActions(obj metav1.Object) (attributes, error return proc.runNodeActions(*typed) case *batchv1.Job: return proc.runJobActions(*typed) + case *batchv1.CronJob: + return proc.runCronJobActions(*typed) case *appsv1.DaemonSet: return proc.runDaemonSetActions(*typed) } @@ -97,6 +102,18 @@ func (m *K8sEventsProcessor) runJobActions(job batchv1.Job) (attributes, error) return res, nil } +func (m *K8sEventsProcessor) runCronJobActions(cronJob batchv1.CronJob) (attributes, error) { + res := attributes{} + for _, action := range m.cronJobActions { + atts, err := action.ComputeAttributes(cronJob) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + func (m *K8sEventsProcessor) runDaemonSetActions(daemonset appsv1.DaemonSet) (attributes, error) { res := attributes{} for _, action := range m.daemonSetActions {