Skip to content

feat: Add CronJob custom processor #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package observek8sattributesprocessor

import (
batch "k8s.io/api/batch/v1"
)

const (
CronJobActiveKey = "active"
)

type CronJobActiveAction struct{}

func NewCronJobActiveAction() CronJobActiveAction {
return CronJobActiveAction{}
}

// ---------------------------------- CronJob "active" ----------------------------------

// Generates the CronJob "active" facet.
// This is essentially just the length of a slice. However, since the slice's
// inner type is not of the accepted ValueTypes for OTTL's Len() function,
// computing this requires a custom processor
func (CronJobActiveAction) ComputeAttributes(cronJob batch.CronJob) (attributes, error) {
return attributes{CronJobActiveKey: len(cronJob.Status.Active)}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package observek8sattributesprocessor

import "testing"

func TestCronJobActions(t *testing.T) {
for _, testCase := range []k8sEventProcessorTest{
{
name: "Active CronJob",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.active", int64(1)},
},
},
{
name: "Idle CronJob",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/cronJobEventNotActive.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.active", int64(0)},
},
},
} {
runTest(t, testCase)
}
}
15 changes: 14 additions & 1 deletion components/processors/observek8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
EventKindPod = "Pod"
EventKindNode = "Node"
EventKindJob = "Job"
EventKindCronJob = "CronJob"
EventKindDaemonSet = "DaemonSet"
)

Expand All @@ -28,6 +29,7 @@ type K8sEventsProcessor struct {
nodeActions []nodeAction
podActions []podAction
jobActions []jobAction
cronJobActions []cronJobAction
daemonSetActions []daemonSetAction
}

Expand All @@ -47,6 +49,9 @@ func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsP
daemonSetActions: []daemonSetAction{
NewDaemonsetSelectorAction(),
},
cronJobActions: []cronJobAction{
NewCronJobActiveAction(),
},
}
}

Expand Down Expand Up @@ -95,11 +100,19 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object {
return nil
}
return &job
case EventKindCronJob:
var cronJob batchv1.CronJob
err := json.Unmarshal([]byte(lr.Body().AsString()), &cronJob)
if err != nil {
kep.logger.Error("failed to unmarshal CronJob event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
return nil
}
return &cronJob
case EventKindDaemonSet:
var daemonSet appsv1.DaemonSet
err := json.Unmarshal([]byte(lr.Body().AsString()), &daemonSet)
if err != nil {
kep.logger.Error("failed to unmarshal daemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
kep.logger.Error("failed to unmarshal DaemonSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString()))
return nil
}
return &daemonSet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
{
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": {
"creationTimestamp": "2024-08-30T11:51:53Z",
"generation": 1,
"managedFields": [
{
"apiVersion": "batch/v1",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:spec": {
"f:concurrencyPolicy": {},
"f:failedJobsHistoryLimit": {},
"f:jobTemplate": {
"f:metadata": {
"f:name": {}
},
"f:spec": {
"f:template": {
"f:spec": {
"f:containers": {
"k:{\"name\":\"my-cronjob\"}": {
".": {},
"f:command": {},
"f:image": {},
"f:imagePullPolicy": {},
"f:name": {},
"f:resources": {},
"f:terminationMessagePath": {},
"f:terminationMessagePolicy": {}
}
},
"f:dnsPolicy": {},
"f:restartPolicy": {},
"f:schedulerName": {},
"f:securityContext": {},
"f:terminationGracePeriodSeconds": {}
}
}
}
},
"f:schedule": {},
"f:successfulJobsHistoryLimit": {},
"f:suspend": {}
}
},
"manager": "kubectl-create",
"operation": "Update",
"time": "2024-08-30T11:51:53Z"
},
{
"apiVersion": "batch/v1",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:status": {
"f:active": {},
"f:lastScheduleTime": {},
"f:lastSuccessfulTime": {}
}
},
"manager": "kube-controller-manager",
"operation": "Update",
"subresource": "status",
"time": "2024-08-30T12:03:00Z"
}
],
"name": "my-cronjob",
"namespace": "k8sexplorer",
"resourceVersion": "429516",
"uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3"
},
"spec": {
"concurrencyPolicy": "Allow",
"failedJobsHistoryLimit": 1,
"jobTemplate": {
"metadata": {
"creationTimestamp": null,
"name": "my-cronjob"
},
"spec": {
"template": {
"metadata": {
"creationTimestamp": null
},
"spec": {
"containers": [
{
"command": [
"/bin/sh",
"-c",
"echo 'Hello, World!'"
],
"image": "busybox",
"imagePullPolicy": "Always",
"name": "my-cronjob",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "OnFailure",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
}
},
"schedule": "*/1 * * * *",
"successfulJobsHistoryLimit": 3,
"suspend": false
},
"status": {
"active": [
{
"apiVersion": "batch/v1",
"kind": "Job",
"name": "my-cronjob-28750323",
"namespace": "k8sexplorer",
"resourceVersion": "429515",
"uid": "037bca5b-81dd-4a4c-8f6b-340db5b38c0d"
}
],
"lastScheduleTime": "2024-08-30T12:03:00Z",
"lastSuccessfulTime": "2024-08-30T12:02:04Z"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
{
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": {
"creationTimestamp": "2024-08-30T11:51:53Z",
"generation": 1,
"managedFields": [
{
"apiVersion": "batch/v1",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:spec": {
"f:concurrencyPolicy": {},
"f:failedJobsHistoryLimit": {},
"f:jobTemplate": {
"f:metadata": {
"f:name": {}
},
"f:spec": {
"f:template": {
"f:spec": {
"f:containers": {
"k:{\"name\":\"my-cronjob\"}": {
".": {},
"f:command": {},
"f:image": {},
"f:imagePullPolicy": {},
"f:name": {},
"f:resources": {},
"f:terminationMessagePath": {},
"f:terminationMessagePolicy": {}
}
},
"f:dnsPolicy": {},
"f:restartPolicy": {},
"f:schedulerName": {},
"f:securityContext": {},
"f:terminationGracePeriodSeconds": {}
}
}
}
},
"f:schedule": {},
"f:successfulJobsHistoryLimit": {},
"f:suspend": {}
}
},
"manager": "kubectl-create",
"operation": "Update",
"time": "2024-08-30T11:51:53Z"
},
{
"apiVersion": "batch/v1",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:status": {
"f:lastScheduleTime": {},
"f:lastSuccessfulTime": {}
}
},
"manager": "kube-controller-manager",
"operation": "Update",
"subresource": "status",
"time": "2024-08-30T12:03:03Z"
}
],
"name": "my-cronjob",
"namespace": "k8sexplorer",
"resourceVersion": "429546",
"uid": "e597eaec-290e-40db-93a4-cde6cb19f2f3"
},
"spec": {
"concurrencyPolicy": "Allow",
"failedJobsHistoryLimit": 1,
"jobTemplate": {
"metadata": {
"creationTimestamp": null,
"name": "my-cronjob"
},
"spec": {
"template": {
"metadata": {
"creationTimestamp": null
},
"spec": {
"containers": [
{
"command": [
"/bin/sh",
"-c",
"echo 'Hello, World!'"
],
"image": "busybox",
"imagePullPolicy": "Always",
"name": "my-cronjob",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "OnFailure",
"schedulerName": "default-scheduler",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
}
},
"schedule": "*/1 * * * *",
"successfulJobsHistoryLimit": 3,
"suspend": false
},
"status": {
"lastScheduleTime": "2024-08-30T12:03:00Z",
"lastSuccessfulTime": "2024-08-30T12:03:03Z"
}
}
17 changes: 17 additions & 0 deletions components/processors/observek8sattributesprocessor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type nodeAction interface {
type jobAction interface {
ComputeAttributes(batchv1.Job) (attributes, error)
}
type cronJobAction interface {
ComputeAttributes(batchv1.CronJob) (attributes, error)
}
type daemonSetAction interface {
ComputeAttributes(appsv1.DaemonSet) (attributes, error)
}
Expand All @@ -51,6 +54,8 @@ func (proc *K8sEventsProcessor) RunActions(obj metav1.Object) (attributes, error
return proc.runNodeActions(*typed)
case *batchv1.Job:
return proc.runJobActions(*typed)
case *batchv1.CronJob:
return proc.runCronJobActions(*typed)
case *appsv1.DaemonSet:
return proc.runDaemonSetActions(*typed)
}
Expand Down Expand Up @@ -97,6 +102,18 @@ func (m *K8sEventsProcessor) runJobActions(job batchv1.Job) (attributes, error)
return res, nil
}

func (m *K8sEventsProcessor) runCronJobActions(cronJob batchv1.CronJob) (attributes, error) {
res := attributes{}
for _, action := range m.cronJobActions {
atts, err := action.ComputeAttributes(cronJob)
if err != nil {
return res, err
}
res.addAttributes(atts)
}
return res, nil
}

func (m *K8sEventsProcessor) runDaemonSetActions(daemonset appsv1.DaemonSet) (attributes, error) {
res := attributes{}
for _, action := range m.daemonSetActions {
Expand Down
Loading