diff --git a/components/processors/observek8sattributesprocessor/daemonsetactions.go b/components/processors/observek8sattributesprocessor/daemonsetactions.go index 55d703328..b9ba427f7 100644 --- a/components/processors/observek8sattributesprocessor/daemonsetactions.go +++ b/components/processors/observek8sattributesprocessor/daemonsetactions.go @@ -15,10 +15,9 @@ func NewDaemonsetSelectorAction() DaemonSetSelectorAction { return DaemonSetSelectorAction{} } -// ---------------------------------- Daemonset "selector" ---------------------------------- +// ---------------------------------- DaemonSet "selector" ---------------------------------- -// Generates the Daemonset "status" facet. Same logic as kubectl printer -// https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1204 +// Generates the Daemonset "selector" facet. func (DaemonSetSelectorAction) ComputeAttributes(daemonset appsv1.DaemonSet) (attributes, error) { selecotString := metav1.FormatLabelSelector(daemonset.Spec.Selector) return attributes{DaemonsetSelectorAttributeKey: selecotString}, nil diff --git a/components/processors/observek8sattributesprocessor/endpointsactions.go b/components/processors/observek8sattributesprocessor/endpointsactions.go new file mode 100644 index 000000000..23409d757 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/endpointsactions.go @@ -0,0 +1,45 @@ +package observek8sattributesprocessor + +import ( + "net" + "strconv" + + corev1 "k8s.io/api/core/v1" +) + +const ( + EnpdointsAttributeKey = "endpoints" +) + +// ---------------------------------- Endpoints "endpoints" ---------------------------------- + +type EndpointsStatusAction struct{} + +func NewEndpointsStatusAction() EndpointsStatusAction { + return EndpointsStatusAction{} +} + +// Generates the Endpoints "endpoints" facet, which is a list of all individual endpoints, encoded as strings +func (EndpointsStatusAction) ComputeAttributes(endpoints corev1.Endpoints) (attributes, error) { + list := []string{} + for _, ss := range endpoints.Subsets { + if len(ss.Ports) == 0 { + // It's possible to have headless services with no ports. + for i := range ss.Addresses { + list = append(list, ss.Addresses[i].IP) + } + // avoid nesting code too deeply + continue + } + + // "Normal" services with ports defined. + for _, port := range ss.Ports { + for i := range ss.Addresses { + addr := &ss.Addresses[i] + hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))) + list = append(list, hostPort) + } + } + } + return attributes{EnpdointsAttributeKey: list}, nil +} diff --git a/components/processors/observek8sattributesprocessor/endpointsactions_test.go b/components/processors/observek8sattributesprocessor/endpointsactions_test.go new file mode 100644 index 000000000..7e41138e5 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/endpointsactions_test.go @@ -0,0 +1,17 @@ +package observek8sattributesprocessor + +import "testing" + +func TestEndpointsActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Endpoints", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/endpointsEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.endpoints", []any{"10.244.0.53:5432"}}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/ingressactions.go b/components/processors/observek8sattributesprocessor/ingressactions.go new file mode 100644 index 000000000..975646405 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/ingressactions.go @@ -0,0 +1,67 @@ +package observek8sattributesprocessor + +import ( + "strings" + + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + IngressRulesAttributeKey = "rules" + IngressLoadBalancerAttributeKey = "loadBalancer" +) + +// Adapted from https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1373 +func formatIngressRules(rules []netv1.IngressRule) string { + list := []string{} + for _, rule := range rules { + list = append(list, rule.Host) + } + if len(list) == 0 { + return "*" + } + ret := strings.Join(list, ",") + return ret +} + +// ---------------------------------- Ingress "rules" ---------------------------------- +type IngressRulesAction struct{} + +func NewIngressRulesAction() IngressRulesAction { + return IngressRulesAction{} +} + +// Generates the Ingress "rules" facet. +func (IngressRulesAction) ComputeAttributes(ingress netv1.Ingress) (attributes, error) { + return attributes{IngressRulesAttributeKey: formatIngressRules(ingress.Spec.Rules)}, nil +} + +// ---------------------------------- Ingress "loadBalancer" ---------------------------------- +type IngressLoadBalancerAction struct{} + +func NewIngressLoadBalancerAction() IngressLoadBalancerAction { + return IngressLoadBalancerAction{} +} + +// Adapted from https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1420 +// (removed wide option and always extract full info) +func ingressLoadBalancerStatusStringer(s netv1.IngressLoadBalancerStatus) string { + ingress := s.Ingress + result := sets.NewString() + for i := range ingress { + if ingress[i].IP != "" { + result.Insert(ingress[i].IP) + } else if ingress[i].Hostname != "" { + result.Insert(ingress[i].Hostname) + } + } + + r := strings.Join(result.List(), ",") + return r +} + +// Generates the Ingress "loadBalancer" facet. +func (IngressLoadBalancerAction) ComputeAttributes(ingress netv1.Ingress) (attributes, error) { + return attributes{IngressLoadBalancerAttributeKey: ingressLoadBalancerStatusStringer(ingress.Status.LoadBalancer)}, nil +} diff --git a/components/processors/observek8sattributesprocessor/ingressactions_test.go b/components/processors/observek8sattributesprocessor/ingressactions_test.go new file mode 100644 index 000000000..5f1893da9 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/ingressactions_test.go @@ -0,0 +1,24 @@ +package observek8sattributesprocessor + +import "testing" + +func TestIngressActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Ingress rules", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/ingressEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.rules", "prometheus.observe-eng.com"}, + }, + }, + { + name: "Ingress rules", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/ingressEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.loadBalancer", "someUniqueElbIdentifier.elb.us-west-2.amazonaws.com"}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/nodeactions.go b/components/processors/observek8sattributesprocessor/nodeactions.go index 9f0e0927f..76f6abd99 100644 --- a/components/processors/observek8sattributesprocessor/nodeactions.go +++ b/components/processors/observek8sattributesprocessor/nodeactions.go @@ -107,9 +107,5 @@ func (NodeRolesAction) ComputeAttributes(node v1.Node) (attributes, error) { } } - ret := make([]any, 0, roles.Len()) - for _, role := range roles.List() { - ret = append(ret, role) - } - return attributes{NodeRolesAttributeKey: ret}, nil + return attributes{NodeRolesAttributeKey: roles.List()}, nil } diff --git a/components/processors/observek8sattributesprocessor/persistentvolumeactions.go b/components/processors/observek8sattributesprocessor/persistentvolumeactions.go new file mode 100644 index 000000000..40ac0f563 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/persistentvolumeactions.go @@ -0,0 +1,71 @@ +package observek8sattributesprocessor + +import ( + corev1 "k8s.io/api/core/v1" +) + +const ( + PersistentVolumeTypeAttributeKey = "volumeType" +) + +type PersistentVolumeTypeAction struct{} + +func NewPersistentVolumeTypeAction() PersistentVolumeTypeAction { + return PersistentVolumeTypeAction{} +} + +// Generates the PersistentVolume "type" facet. +func (PersistentVolumeTypeAction) ComputeAttributes(pvc corev1.PersistentVolume) (attributes, error) { + spec := pvc.Spec.PersistentVolumeSource + var persistentVolumeType string + switch { + case spec.GCEPersistentDisk != nil: + persistentVolumeType = "GCEPersistentDisk" + case spec.AWSElasticBlockStore != nil: + persistentVolumeType = "AWSElasticBlockStore" + case spec.HostPath != nil: + persistentVolumeType = "HostPath" + case spec.Glusterfs != nil: + persistentVolumeType = "Glusterfs" + case spec.NFS != nil: + persistentVolumeType = "NFS" + case spec.RBD != nil: + persistentVolumeType = "RBD" + case spec.ISCSI != nil: + persistentVolumeType = "ISCSI" + case spec.Cinder != nil: + persistentVolumeType = "Cinder" + case spec.CephFS != nil: + persistentVolumeType = "CephFS" + case spec.FC != nil: + persistentVolumeType = "FC" + case spec.Flocker != nil: + persistentVolumeType = "Flocker" + case spec.FlexVolume != nil: + persistentVolumeType = "FlexVolume" + case spec.AzureFile != nil: + persistentVolumeType = "AzureFile" + case spec.VsphereVolume != nil: + persistentVolumeType = "VsphereVolume" + case spec.Quobyte != nil: + persistentVolumeType = "Quobyte" + case spec.AzureDisk != nil: + persistentVolumeType = "AzureDisk" + case spec.PhotonPersistentDisk != nil: + persistentVolumeType = "PhotonPersistentDisk" + case spec.PortworxVolume != nil: + persistentVolumeType = "PortworxVolume" + case spec.ScaleIO != nil: + persistentVolumeType = "ScaleIO" + case spec.Local != nil: + persistentVolumeType = "Local" + case spec.StorageOS != nil: + persistentVolumeType = "StorageOS" + case spec.CSI != nil: + persistentVolumeType = "CSI" + default: + // This should never happen, since exactly one of the above should be set + persistentVolumeType = "Unknown" + } + return attributes{PersistentVolumeTypeAttributeKey: persistentVolumeType}, nil +} diff --git a/components/processors/observek8sattributesprocessor/persistentvolumeactions_test.go b/components/processors/observek8sattributesprocessor/persistentvolumeactions_test.go new file mode 100644 index 000000000..c13087fc2 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/persistentvolumeactions_test.go @@ -0,0 +1,24 @@ +package observek8sattributesprocessor + +import "testing" + +func TestPersistentVolumeActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Extract PersistentVolume type (AWSElasticBlockStore)", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeAWSElasticBlockStoreEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.volumeType", "AWSElasticBlockStore"}, + }, + }, + { + name: "Extract PersistentVolume type (HostPath)", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeHostPathEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.volumeType", "HostPath"}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions.go b/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions.go new file mode 100644 index 000000000..6ff293b6f --- /dev/null +++ b/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions.go @@ -0,0 +1,22 @@ +package observek8sattributesprocessor + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + PersistentVolumeClaimSelectorAttributeKey = "selector" +) + +type PersistentVolumeClaimSelectorAction struct{} + +func NewPersistentVolumeClaimSelectorAction() PersistentVolumeClaimSelectorAction { + return PersistentVolumeClaimSelectorAction{} +} + +// Generates the PersistentVolumeClaim "selector" facet. +func (PersistentVolumeClaimSelectorAction) ComputeAttributes(pvc corev1.PersistentVolumeClaim) (attributes, error) { + selecotString := metav1.FormatLabelSelector(pvc.Spec.Selector) + return attributes{PersistentVolumeClaimSelectorAttributeKey: selecotString}, nil +} diff --git a/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions_test.go b/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions_test.go new file mode 100644 index 000000000..f6dec69c2 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/persistentvolumeclaimactions_test.go @@ -0,0 +1,17 @@ +package observek8sattributesprocessor + +import "testing" + +func TestPersistentVolumeClaimActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Pretty print of a PersistentVolumeClaim's selector", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeClaimEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.selector", "environment in (production,staging),storage-tier=high-performance"}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/podactions.go b/components/processors/observek8sattributesprocessor/podactions.go index 39452c9a3..fab80d648 100644 --- a/components/processors/observek8sattributesprocessor/podactions.go +++ b/components/processors/observek8sattributesprocessor/podactions.go @@ -19,6 +19,18 @@ const ( PodReadinessGatesTotalAttributeKey = "readinessGatesTotal" PodConditionsAttributeKey = "conditions" + + PodCronJobNameAttributeKey = "cronJobName" + OwnerKindCronJob = "CronJob" + + PodJobNameAttributeKey = "jobName" + OwnerKindJob = "Job" + + PodDaemonSetNameAttributeKey = "daemonSetName" + OwnerKindDaemonSet = "DaemonSet" + + PodStatefulSetNameAttributeKey = "statefulSetName" + OwnerKindStatefulSet = "StatefulSet" ) // ---------------------------------- Pod "status" ---------------------------------- @@ -190,3 +202,75 @@ func (PodConditionsAction) ComputeAttributes(pod v1.Pod) (attributes, error) { // This facet's value is a map itself with k-v pairs, keyed with strings return attributes{PodConditionsAttributeKey: conditions}, nil } + +// ---------------------------------- Pod "jobName" ---------------------------------- + +type PodJobNameAction struct{} + +func NewPodJobAction() PodJobNameAction { + return PodJobNameAction{} +} + +// Name of the job this Pod belongs to (only present if the owner is a Job resource) +func (PodJobNameAction) ComputeAttributes(pod v1.Pod) (attributes, error) { + for _, ref := range pod.OwnerReferences { + if ref.Kind == OwnerKindJob { + return attributes{PodJobNameAttributeKey: ref.Name}, nil + } + } + return attributes{}, nil +} + +// ---------------------------------- Pod "cronJobName" ---------------------------------- + +type PodCronJobNameAction struct{} + +func NewPodCronJobNameAction() PodCronJobNameAction { + return PodCronJobNameAction{} +} + +// Name of the cronJob this Pod belongs to (only present if the owner is a CronJobName resource) +func (PodCronJobNameAction) ComputeAttributes(pod v1.Pod) (attributes, error) { + for _, ref := range pod.OwnerReferences { + if ref.Kind == OwnerKindCronJob { + return attributes{PodCronJobNameAttributeKey: ref.Name}, nil + } + } + return attributes{}, nil +} + +// ---------------------------------- Pod "daemonSetName" ---------------------------------- + +type PodDaemonSetNameAction struct{} + +func NewPodDaemonSetNameAction() PodDaemonSetNameAction { + return PodDaemonSetNameAction{} +} + +// Name of the cronJob this Pod belongs to (only present if the owner is a DaemonSetName resource) +func (PodDaemonSetNameAction) ComputeAttributes(pod v1.Pod) (attributes, error) { + for _, ref := range pod.OwnerReferences { + if ref.Kind == OwnerKindDaemonSet { + return attributes{PodDaemonSetNameAttributeKey: ref.Name}, nil + } + } + return attributes{}, nil +} + +// ---------------------------------- Pod "statefulSetName" ---------------------------------- + +type PodStatefulSetNameAction struct{} + +func NewPodStatefulSetNameAction() PodStatefulSetNameAction { + return PodStatefulSetNameAction{} +} + +// Name of the cronJob this Pod belongs to (only present if the owner is a StatefulSetName resource) +func (PodStatefulSetNameAction) ComputeAttributes(pod v1.Pod) (attributes, error) { + for _, ref := range pod.OwnerReferences { + if ref.Kind == OwnerKindStatefulSet { + return attributes{PodStatefulSetNameAttributeKey: ref.Name}, nil + } + } + return attributes{}, nil +} diff --git a/components/processors/observek8sattributesprocessor/processor.go b/components/processors/observek8sattributesprocessor/processor.go index 34b0f2ae5..d8cd06b01 100644 --- a/components/processors/observek8sattributesprocessor/processor.go +++ b/components/processors/observek8sattributesprocessor/processor.go @@ -12,25 +12,49 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( - EventKindPod = "Pod" - EventKindNode = "Node" - EventKindJob = "Job" - EventKindCronJob = "CronJob" - EventKindDaemonSet = "DaemonSet" + // CORE + EventKindPod = "Pod" + EventKindNode = "Node" + EventKindServiceAccount = "ServiceAccount" + EventKindEndpoints = "Endpoints" + // APPS + EventKindStatefulSet = "StatefulSet" + EventKindDaemonSet = "DaemonSet" + // WORKLOAD + EventKindJob = "Job" + EventKindCronJob = "CronJob" + // STORAGE + EventKindPersistentVolume = "PersistentVolume" + EventKindPersistentVolumeClaim = "PersistentVolumeClaim" + // NETWORK + EventKindIngress = "Ingress" ) type K8sEventsProcessor struct { - cfg component.Config - logger *zap.Logger + cfg component.Config + logger *zap.Logger + nodeActions []nodeAction podActions []podAction - jobActions []jobAction - cronJobActions []cronJobAction - daemonSetActions []daemonSetAction + endpointsActions []endpointsAction + + jobActions []jobAction + cronJobActions []cronJobAction + + daemonSetActions []daemonSetAction + statefulSetActions []statefulSetAction + + persistentVolumeActions []persistentVolumeAction + persistentVolumeClaimActions []persistentVolumeClaimAction + + ingressActions []ingressAction + + serviceAccountActions []serviceAccountAction } func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsProcessor { @@ -43,14 +67,36 @@ func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsP nodeActions: []nodeAction{ NewNodeStatusAction(), NewNodeRolesAction(), NewNodePoolAction(), }, + endpointsActions: []endpointsAction{ + NewEndpointsStatusAction(), + }, jobActions: []jobAction{ NewJobStatusAction(), NewJobDurationAction(), }, + cronJobActions: []cronJobAction{ + NewCronJobActiveAction(), + }, + daemonSetActions: []daemonSetAction{ NewDaemonsetSelectorAction(), }, - cronJobActions: []cronJobAction{ - NewCronJobActiveAction(), + statefulSetActions: []statefulSetAction{ + NewStatefulsetSelectorAction(), + }, + + persistentVolumeActions: []persistentVolumeAction{ + NewPersistentVolumeTypeAction(), + }, + persistentVolumeClaimActions: []persistentVolumeClaimAction{ + NewPersistentVolumeClaimSelectorAction(), + }, + + ingressActions: []ingressAction{ + NewIngressRulesAction(), NewIngressLoadBalancerAction(), + }, + + serviceAccountActions: []serviceAccountAction{ + NewServiceAccountSecretsNamesAction(), }, } } @@ -92,6 +138,38 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object { return nil } return &pod + case EventKindEndpoints: + var endpoints corev1.Endpoints + err := json.Unmarshal([]byte(lr.Body().AsString()), &endpoints) + if err != nil { + kep.logger.Error("failed to unmarshal Endpoints event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &endpoints + case EventKindServiceAccount: + var sa corev1.ServiceAccount + err := json.Unmarshal([]byte(lr.Body().AsString()), &sa) + if err != nil { + kep.logger.Error("failed to unmarshal ServiceAccount event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &sa + case EventKindPersistentVolumeClaim: + var persistentVolumeClaim corev1.PersistentVolumeClaim + err := json.Unmarshal([]byte(lr.Body().AsString()), &persistentVolumeClaim) + if err != nil { + kep.logger.Error("failed to unmarshal PersistentVolumeClaim event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &persistentVolumeClaim + case EventKindPersistentVolume: + var persistentVolume corev1.PersistentVolume + err := json.Unmarshal([]byte(lr.Body().AsString()), &persistentVolume) + if err != nil { + kep.logger.Error("failed to unmarshal PersistentVolume event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &persistentVolume case EventKindJob: var job batchv1.Job err := json.Unmarshal([]byte(lr.Body().AsString()), &job) @@ -108,6 +186,14 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object { return nil } return &cronJob + case EventKindStatefulSet: + var statefulSet appsv1.StatefulSet + err := json.Unmarshal([]byte(lr.Body().AsString()), &statefulSet) + if err != nil { + kep.logger.Error("failed to unmarshal StatefulSet event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &statefulSet case EventKindDaemonSet: var daemonSet appsv1.DaemonSet err := json.Unmarshal([]byte(lr.Body().AsString()), &daemonSet) @@ -116,6 +202,14 @@ func (kep *K8sEventsProcessor) unmarshalEvent(lr plog.LogRecord) metav1.Object { return nil } return &daemonSet + case EventKindIngress: + var ingress netv1.Ingress + err := json.Unmarshal([]byte(lr.Body().AsString()), &ingress) + if err != nil { + kep.logger.Error("failed to unmarshal Ingress event %v", zap.Error(err), zap.String("event", lr.Body().AsString())) + return nil + } + return &ingress default: return nil } @@ -215,7 +309,7 @@ func mapPut(theMap pcommon.Map, key string, value any) error { theMap.PutBool(key, typed) case float64: theMap.PutDouble(key, typed) - case []any: + case []string: slc := theMap.PutEmptySlice(key) slc.EnsureCapacity(len(typed)) for _, elem := range typed { diff --git a/components/processors/observek8sattributesprocessor/serviceaccountactions.go b/components/processors/observek8sattributesprocessor/serviceaccountactions.go new file mode 100644 index 000000000..785df2c9c --- /dev/null +++ b/components/processors/observek8sattributesprocessor/serviceaccountactions.go @@ -0,0 +1,27 @@ +package observek8sattributesprocessor + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + ServiceAccountSecretsNamesAttributeKey = "secretsNames" +) + +// ---------------------------------- ServiceAccount "secretsNames" ---------------------------------- +type ServiceAccountSecretsNamesAction struct{} + +func NewServiceAccountSecretsNamesAction() ServiceAccountSecretsNamesAction { + return ServiceAccountSecretsNamesAction{} +} + +// Generates the ServiceAccount "secretsNames" facet. +func (ServiceAccountSecretsNamesAction) ComputeAttributes(serviceAccount corev1.ServiceAccount) (attributes, error) { + result := sets.NewString() + for _, secret := range serviceAccount.Secrets { + result.Insert(secret.Name) + } + + return attributes{ServiceAccountSecretsNamesAttributeKey: result.List()}, nil +} diff --git a/components/processors/observek8sattributesprocessor/serviceaccountactions_test.go b/components/processors/observek8sattributesprocessor/serviceaccountactions_test.go new file mode 100644 index 000000000..0a31ab27c --- /dev/null +++ b/components/processors/observek8sattributesprocessor/serviceaccountactions_test.go @@ -0,0 +1,17 @@ +package observek8sattributesprocessor + +import "testing" + +func TestServiceAccountActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Service account secrets", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/serviceAccountEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.secretsNames", []any{"example-another-secret", "example-serviceaccount-token-abcdef"}}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/statefulsetactions.go b/components/processors/observek8sattributesprocessor/statefulsetactions.go new file mode 100644 index 000000000..5d11433a3 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/statefulsetactions.go @@ -0,0 +1,24 @@ +package observek8sattributesprocessor + +import ( + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + StatefulsetSelectorAttributeKey = "selector" +) + +type StatefulSetSelectorAction struct{} + +func NewStatefulsetSelectorAction() StatefulSetSelectorAction { + return StatefulSetSelectorAction{} +} + +// ---------------------------------- StatefulSet "selector" ---------------------------------- + +// Generates the Statefulset "selector" facet. +func (StatefulSetSelectorAction) ComputeAttributes(statefulSet appsv1.StatefulSet) (attributes, error) { + selecotString := metav1.FormatLabelSelector(statefulSet.Spec.Selector) + return attributes{StatefulsetSelectorAttributeKey: selecotString}, nil +} diff --git a/components/processors/observek8sattributesprocessor/statefulsetactions_test.go b/components/processors/observek8sattributesprocessor/statefulsetactions_test.go new file mode 100644 index 000000000..45ae2c11b --- /dev/null +++ b/components/processors/observek8sattributesprocessor/statefulsetactions_test.go @@ -0,0 +1,17 @@ +package observek8sattributesprocessor + +import "testing" + +func TestStatefulSetActions(t *testing.T) { + for _, testCase := range []k8sEventProcessorTest{ + { + name: "Pretty print of a StatefulSet's selector", + inLogs: resourceLogsFromSingleJsonEvent("./testdata/statefulSetEvent.json"), + expectedResults: []queryWithResult{ + {"observe_transform.facets.selector", "app=redis-ephemeral"}, + }, + }, + } { + runTest(t, testCase) + } +} diff --git a/components/processors/observek8sattributesprocessor/testdata/endpointsEvent.json b/components/processors/observek8sattributesprocessor/testdata/endpointsEvent.json new file mode 100644 index 000000000..85084c14a --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/endpointsEvent.json @@ -0,0 +1,64 @@ +{ + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "annotations": { + "endpoints.kubernetes.io/last-change-trigger-time": "2024-09-01T22:14:53Z" + }, + "creationTimestamp": "2024-09-01T22:14:51Z", + "labels": { + "app": "postgres", + "environment": "sandbox" + }, + "managedFields": [ + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:endpoints.kubernetes.io/last-change-trigger-time": {} + }, + "f:labels": { + ".": {}, + "f:app": {}, + "f:environment": {} + } + }, + "f:subsets": {} + }, + "manager": "kube-controller-manager", + "operation": "Update", + "time": "2024-09-01T22:14:53Z" + } + ], + "name": "postgres", + "namespace": "sandbox", + "resourceVersion": "12943492", + "uid": "020c3ca3-e5d5-4e04-af9d-b910c844996f" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "10.244.0.53", + "nodeName": "ip-172-31-34-201", + "targetRef": { + "kind": "Pod", + "name": "postgres-bb5559b7-z45cn", + "namespace": "sandbox", + "uid": "7c130e23-f9e0-4240-ab82-bb24e435f3b4" + } + } + ], + "ports": [ + { + "name": "postgres", + "port": 5432, + "protocol": "TCP" + } + ] + } + ] +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/ingressEvent.json b/components/processors/observek8sattributesprocessor/testdata/ingressEvent.json new file mode 100644 index 000000000..c949c73f6 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/ingressEvent.json @@ -0,0 +1,112 @@ +{ + "apiVersion": "networking.k8s.io/v1", + "kind": "Ingress", + "metadata": { + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"networking.k8s.io/v1\",\"kind\":\"Ingress\",\"metadata\":{\"annotations\":{\"kubernetes.io/ingress.class\":\"nginx\",\"nginx.ingress.kubernetes.io/auth-signin\":\"https://$host/oauth2/start?rd=$request_uri\",\"nginx.ingress.kubernetes.io/auth-url\":\"https://$host/oauth2/auth\",\"o11y.io/audit\":\"2023-10-31\",\"o11y.io/team\":\"Infra\",\"observeinc.com/kubectl-nonce\":\"1\"},\"labels\":{\"observeinc.com/app\":\"prometheus\",\"observeinc.com/environment\":\"eng\"},\"name\":\"prometheus\",\"namespace\":\"monitoring\"},\"spec\":{\"rules\":[{\"host\":\"prometheus.observe-eng.com\",\"http\":{\"paths\":[{\"backend\":{\"service\":{\"name\":\"prometheus\",\"port\":{\"name\":\"prometheus\"}}},\"path\":\"/\",\"pathType\":\"ImplementationSpecific\"}]}}],\"tls\":[{\"hosts\":[\"prometheus.observe-eng.com\"],\"secretName\":\"starcert-tls\"}]}}\n", + "kubernetes.io/ingress.class": "nginx", + "nginx.ingress.kubernetes.io/auth-signin": "https://$host/oauth2/start?rd=$request_uri", + "nginx.ingress.kubernetes.io/auth-url": "https://$host/oauth2/auth", + "o11y.io/audit": "2023-10-31", + "o11y.io/team": "Infra", + "observeinc.com/kubectl-nonce": "1" + }, + "creationTimestamp": "2024-04-25T22:51:42Z", + "generation": 1, + "labels": { + "observeinc.com/app": "prometheus", + "observeinc.com/environment": "eng" + }, + "managedFields": [ + { + "apiVersion": "networking.k8s.io/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:kubectl.kubernetes.io/last-applied-configuration": {}, + "f:kubernetes.io/ingress.class": {}, + "f:nginx.ingress.kubernetes.io/auth-signin": {}, + "f:nginx.ingress.kubernetes.io/auth-url": {}, + "f:o11y.io/audit": {}, + "f:o11y.io/team": {}, + "f:observeinc.com/kubectl-nonce": {} + }, + "f:labels": { + ".": {}, + "f:observeinc.com/app": {}, + "f:observeinc.com/environment": {} + } + }, + "f:spec": { + "f:rules": {}, + "f:tls": {} + } + }, + "manager": "kubectl-client-side-apply", + "operation": "Update", + "time": "2024-04-25T22:51:42Z" + }, + { + "apiVersion": "networking.k8s.io/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:loadBalancer": { + "f:ingress": {} + } + } + }, + "manager": "nginx-ingress-controller", + "operation": "Update", + "subresource": "status", + "time": "2024-04-26T17:44:59Z" + } + ], + "name": "prometheus", + "namespace": "monitoring", + "resourceVersion": "741757", + "uid": "4cb12882-1113-40b2-b4ec-e3a0dad283a7" + }, + "spec": { + "rules": [ + { + "host": "prometheus.observe-eng.com", + "http": { + "paths": [ + { + "backend": { + "service": { + "name": "prometheus", + "port": { + "name": "prometheus" + } + } + }, + "path": "/", + "pathType": "ImplementationSpecific" + } + ] + } + } + ], + "tls": [ + { + "hosts": [ + "prometheus.observe-eng.com" + ], + "secretName": "starcert-tls" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "hostname": "someUniqueElbIdentifier.elb.us-west-2.amazonaws.com" + } + ] + } + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/persistentVolumeAWSElasticBlockStoreEvent.json b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeAWSElasticBlockStoreEvent.json new file mode 100644 index 000000000..6a71c9dda --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeAWSElasticBlockStoreEvent.json @@ -0,0 +1,168 @@ +{ + "apiVersion": "v1", + "kind": "PersistentVolume", + "metadata": { + "annotations": { + "pv.kubernetes.io/migrated-to": "ebs.csi.aws.com", + "pv.kubernetes.io/provisioned-by": "kubernetes.io/aws-ebs", + "volume.kubernetes.io/provisioner-deletion-secret-name": "", + "volume.kubernetes.io/provisioner-deletion-secret-namespace": "" + }, + "creationTimestamp": "2024-04-25T22:52:30Z", + "finalizers": [ + "kubernetes.io/pv-protection", + "external-attacher/ebs-csi-aws-com" + ], + "labels": { + "topology.kubernetes.io/region": "us-west-2", + "topology.kubernetes.io/zone": "us-west-2a" + }, + "managedFields": [ + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:pv.kubernetes.io/provisioned-by": {}, + "f:volume.kubernetes.io/provisioner-deletion-secret-name": {}, + "f:volume.kubernetes.io/provisioner-deletion-secret-namespace": {} + }, + "f:labels": { + ".": {}, + "f:topology.kubernetes.io/region": {}, + "f:topology.kubernetes.io/zone": {} + } + }, + "f:spec": { + "f:accessModes": {}, + "f:awsElasticBlockStore": { + ".": {}, + "f:fsType": {}, + "f:volumeID": {} + }, + "f:capacity": { + ".": {}, + "f:storage": {} + }, + "f:claimRef": { + ".": {}, + "f:apiVersion": {}, + "f:kind": {}, + "f:name": {}, + "f:namespace": {}, + "f:resourceVersion": {}, + "f:uid": {} + }, + "f:nodeAffinity": { + ".": {}, + "f:required": {} + }, + "f:persistentVolumeReclaimPolicy": {}, + "f:storageClassName": {}, + "f:volumeMode": {} + } + }, + "manager": "csi-provisioner", + "operation": "Update", + "time": "2024-04-25T22:52:30Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + "f:pv.kubernetes.io/migrated-to": {} + } + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "time": "2024-04-25T22:52:30Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:phase": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-04-25T22:52:30Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:finalizers": { + "v:\"external-attacher/ebs-csi-aws-com\"": {} + } + } + }, + "manager": "csi-attacher", + "operation": "Update", + "time": "2024-04-25T22:52:31Z" + } + ], + "name": "pvc-2d374650-85f4-48ea-8839-0596bf28a610", + "resourceVersion": "440336", + "uid": "1920cbe4-686a-4f3b-bdea-50fed4bc5f3e" + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "awsElasticBlockStore": { + "fsType": "ext4", + "volumeID": "vol-05b88c083764c0c5b" + }, + "capacity": { + "storage": "4Gi" + }, + "claimRef": { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "name": "datadir-kafka-zookeeper-00-1", + "namespace": "kafka-e", + "resourceVersion": "440297", + "uid": "2d374650-85f4-48ea-8839-0596bf28a610" + }, + "nodeAffinity": { + "required": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "topology.kubernetes.io/zone", + "operator": "In", + "values": [ + "us-west-2a" + ] + }, + { + "key": "topology.kubernetes.io/region", + "operator": "In", + "values": [ + "us-west-2" + ] + } + ] + } + ] + } + }, + "persistentVolumeReclaimPolicy": "Delete", + "storageClassName": "gp2", + "volumeMode": "Filesystem" + }, + "status": { + "lastPhaseTransitionTime": "2024-04-25T22:52:30Z", + "phase": "Bound" + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/persistentVolumeClaimEvent.json b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeClaimEvent.json new file mode 100644 index 000000000..cd326e7d8 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeClaimEvent.json @@ -0,0 +1,124 @@ +{ + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": { + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"v1\",\"kind\":\"PersistentVolumeClaim\",\"metadata\":{\"annotations\":{},\"name\":\"dummy-pvc\",\"namespace\":\"k8sexplorer\"},\"spec\":{\"accessModes\":[\"ReadWriteOnce\"],\"resources\":{\"requests\":{\"storage\":\"1Gi\"}},\"selector\":{\"matchExpressions\":[{\"key\":\"environment\",\"operator\":\"In\",\"values\":[\"production\",\"staging\"]}],\"matchLabels\":{\"storage-tier\":\"high-performance\"}}}}\n", + "pv.kubernetes.io/bind-completed": "yes", + "pv.kubernetes.io/bound-by-controller": "yes", + "volume.beta.kubernetes.io/storage-provisioner": "k8s.io/minikube-hostpath", + "volume.kubernetes.io/storage-provisioner": "k8s.io/minikube-hostpath" + }, + "creationTimestamp": "2024-09-03T11:05:38Z", + "finalizers": [ + "kubernetes.io/pvc-protection" + ], + "managedFields": [ + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + "f:pv.kubernetes.io/bind-completed": {}, + "f:pv.kubernetes.io/bound-by-controller": {}, + "f:volume.beta.kubernetes.io/storage-provisioner": {}, + "f:volume.kubernetes.io/storage-provisioner": {} + } + }, + "f:spec": { + "f:volumeName": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "time": "2024-09-03T11:05:38Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:accessModes": {}, + "f:capacity": { + ".": {}, + "f:storage": {} + }, + "f:phase": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-09-03T11:05:38Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:kubectl.kubernetes.io/last-applied-configuration": {} + } + }, + "f:spec": { + "f:accessModes": {}, + "f:resources": { + "f:requests": { + ".": {}, + "f:storage": {} + } + }, + "f:selector": {}, + "f:volumeMode": {} + } + }, + "manager": "kubectl-client-side-apply", + "operation": "Update", + "time": "2024-09-03T11:05:38Z" + } + ], + "name": "dummy-pvc", + "namespace": "k8sexplorer", + "resourceVersion": "570949", + "uid": "301d82c3-2fa7-45b9-8fb3-3de3bd8d405c" + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "resources": { + "requests": { + "storage": "1Gi" + } + }, + "selector": { + "matchExpressions": [ + { + "key": "environment", + "operator": "In", + "values": [ + "production", + "staging" + ] + } + ], + "matchLabels": { + "storage-tier": "high-performance" + } + }, + "storageClassName": "standard", + "volumeMode": "Filesystem", + "volumeName": "pvc-301d82c3-2fa7-45b9-8fb3-3de3bd8d405c" + }, + "status": { + "accessModes": [ + "ReadWriteOnce" + ], + "capacity": { + "storage": "1Gi" + }, + "phase": "Bound" + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/persistentVolumeHostPathEvent.json b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeHostPathEvent.json new file mode 100644 index 000000000..41c4206dd --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/persistentVolumeHostPathEvent.json @@ -0,0 +1,99 @@ +{ + "apiVersion": "v1", + "kind": "PersistentVolume", + "metadata": { + "annotations": { + "hostPathProvisionerIdentity": "f7711d29-f074-4a45-84e4-6fb6f18b8715", + "pv.kubernetes.io/provisioned-by": "k8s.io/minikube-hostpath" + }, + "creationTimestamp": "2024-09-03T11:05:38Z", + "finalizers": [ + "kubernetes.io/pv-protection" + ], + "managedFields": [ + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:phase": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-09-03T11:05:38Z" + }, + { + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:hostPathProvisionerIdentity": {}, + "f:pv.kubernetes.io/provisioned-by": {} + } + }, + "f:spec": { + "f:accessModes": {}, + "f:capacity": { + ".": {}, + "f:storage": {} + }, + "f:claimRef": { + ".": {}, + "f:apiVersion": {}, + "f:kind": {}, + "f:name": {}, + "f:namespace": {}, + "f:resourceVersion": {}, + "f:uid": {} + }, + "f:hostPath": { + ".": {}, + "f:path": {}, + "f:type": {} + }, + "f:persistentVolumeReclaimPolicy": {}, + "f:storageClassName": {}, + "f:volumeMode": {} + } + }, + "manager": "storage-provisioner", + "operation": "Update", + "time": "2024-09-03T11:05:38Z" + } + ], + "name": "pvc-301d82c3-2fa7-45b9-8fb3-3de3bd8d405c", + "resourceVersion": "570946", + "uid": "eb72ef22-1f9e-4389-a952-31925343d6e2" + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "capacity": { + "storage": "1Gi" + }, + "claimRef": { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "name": "dummy-pvc", + "namespace": "k8sexplorer", + "resourceVersion": "570942", + "uid": "301d82c3-2fa7-45b9-8fb3-3de3bd8d405c" + }, + "hostPath": { + "path": "/tmp/hostpath-provisioner/k8sexplorer/dummy-pvc", + "type": "" + }, + "persistentVolumeReclaimPolicy": "Delete", + "storageClassName": "standard", + "volumeMode": "Filesystem" + }, + "status": { + "lastPhaseTransitionTime": "2024-09-03T11:05:38Z", + "phase": "Bound" + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/serviceAccountEvent.json b/components/processors/observek8sattributesprocessor/testdata/serviceAccountEvent.json new file mode 100644 index 000000000..6fc40d4bb --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/serviceAccountEvent.json @@ -0,0 +1,21 @@ +{ + "kind": "ServiceAccount", + "apiVersion": "v1", + "metadata": { + "name": "example-serviceaccount", + "namespace": "default", + "selfLink": "/api/v1/namespaces/default/serviceaccounts/example-serviceaccount", + "uid": "abcd1234-5678-90ef-ghij-klmnopqrstuv", + "resourceVersion": "123456", + "creationTimestamp": "2023-09-04T12:34:56Z" + }, + "secrets": [ + { + "name": "example-serviceaccount-token-abcdef" + }, + { + "name": "example-another-secret" + } + ], + "imagePullSecrets": [] +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/testdata/statefulSetEvent.json b/components/processors/observek8sattributesprocessor/testdata/statefulSetEvent.json new file mode 100644 index 000000000..70dc72bf7 --- /dev/null +++ b/components/processors/observek8sattributesprocessor/testdata/statefulSetEvent.json @@ -0,0 +1,223 @@ +{ + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "metadata": { + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"apps/v1\",\"kind\":\"StatefulSet\",\"metadata\":{\"annotations\":{},\"name\":\"redis-ephemeral\",\"namespace\":\"testbox\"},\"spec\":{\"replicas\":1,\"selector\":{\"matchLabels\":{\"app\":\"redis-ephemeral\"}},\"serviceName\":\"redis-ephemeral\",\"template\":{\"metadata\":{\"labels\":{\"app\":\"redis-ephemeral\"}},\"spec\":{\"containers\":[{\"command\":[\"/usr/local/bin/redis-server\",\"/etc/redis/redis.conf\"],\"image\":\"redis:7.0.11-bullseye\",\"name\":\"redis\",\"ports\":[{\"containerPort\":6379,\"name\":\"redis\"}],\"resources\":{\"requests\":{\"cpu\":0.1,\"memory\":\"128M\"}},\"volumeMounts\":[{\"mountPath\":\"/etc/redis/\",\"name\":\"redis-ephemeral-config\"}]}],\"volumes\":[{\"configMap\":{\"name\":\"redis-ephemeral-config\"},\"name\":\"redis-ephemeral-config\"}]}}}}\n" + }, + "creationTimestamp": "2024-07-24T01:28:16Z", + "generation": 1, + "managedFields": [ + { + "apiVersion": "apps/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:metadata": { + "f:annotations": { + ".": {}, + "f:kubectl.kubernetes.io/last-applied-configuration": {} + } + }, + "f:spec": { + "f:persistentVolumeClaimRetentionPolicy": { + ".": {}, + "f:whenDeleted": {}, + "f:whenScaled": {} + }, + "f:podManagementPolicy": {}, + "f:replicas": {}, + "f:revisionHistoryLimit": {}, + "f:selector": {}, + "f:serviceName": {}, + "f:template": { + "f:metadata": { + "f:labels": { + ".": {}, + "f:app": {} + } + }, + "f:spec": { + "f:containers": { + "k:{\"name\":\"redis\"}": { + ".": {}, + "f:command": {}, + "f:image": {}, + "f:imagePullPolicy": {}, + "f:name": {}, + "f:ports": { + ".": {}, + "k:{\"containerPort\":6379,\"protocol\":\"TCP\"}": { + ".": {}, + "f:containerPort": {}, + "f:name": {}, + "f:protocol": {} + } + }, + "f:resources": { + ".": {}, + "f:requests": { + ".": {}, + "f:cpu": {}, + "f:memory": {} + } + }, + "f:terminationMessagePath": {}, + "f:terminationMessagePolicy": {}, + "f:volumeMounts": { + ".": {}, + "k:{\"mountPath\":\"/etc/redis/\"}": { + ".": {}, + "f:mountPath": {}, + "f:name": {} + } + } + } + }, + "f:dnsPolicy": {}, + "f:restartPolicy": {}, + "f:schedulerName": {}, + "f:securityContext": {}, + "f:terminationGracePeriodSeconds": {}, + "f:volumes": { + ".": {}, + "k:{\"name\":\"redis-ephemeral-config\"}": { + ".": {}, + "f:configMap": { + ".": {}, + "f:defaultMode": {}, + "f:name": {} + }, + "f:name": {} + } + } + } + }, + "f:updateStrategy": { + "f:rollingUpdate": { + ".": {}, + "f:partition": {} + }, + "f:type": {} + } + } + }, + "manager": "kubectl-client-side-apply", + "operation": "Update", + "time": "2024-07-24T01:28:16Z" + }, + { + "apiVersion": "apps/v1", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:status": { + "f:availableReplicas": {}, + "f:collisionCount": {}, + "f:currentReplicas": {}, + "f:currentRevision": {}, + "f:observedGeneration": {}, + "f:readyReplicas": {}, + "f:replicas": {}, + "f:updateRevision": {}, + "f:updatedReplicas": {} + } + }, + "manager": "kube-controller-manager", + "operation": "Update", + "subresource": "status", + "time": "2024-07-24T01:28:18Z" + } + ], + "name": "redis-ephemeral", + "namespace": "testbox", + "resourceVersion": "8463632", + "uid": "5e90c4bb-eac8-45a1-be75-b4b7b068e96a" + }, + "spec": { + "persistentVolumeClaimRetentionPolicy": { + "whenDeleted": "Retain", + "whenScaled": "Retain" + }, + "podManagementPolicy": "OrderedReady", + "replicas": 1, + "revisionHistoryLimit": 10, + "selector": { + "matchLabels": { + "app": "redis-ephemeral" + } + }, + "serviceName": "redis-ephemeral", + "template": { + "metadata": { + "creationTimestamp": null, + "labels": { + "app": "redis-ephemeral" + } + }, + "spec": { + "containers": [ + { + "command": [ + "/usr/local/bin/redis-server", + "/etc/redis/redis.conf" + ], + "image": "redis:7.0.11-bullseye", + "imagePullPolicy": "IfNotPresent", + "name": "redis", + "ports": [ + { + "containerPort": 6379, + "name": "redis", + "protocol": "TCP" + } + ], + "resources": { + "requests": { + "cpu": "100m", + "memory": "128M" + } + }, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + { + "mountPath": "/etc/redis/", + "name": "redis-ephemeral-config" + } + ] + } + ], + "dnsPolicy": "ClusterFirst", + "restartPolicy": "Always", + "schedulerName": "default-scheduler", + "securityContext": {}, + "terminationGracePeriodSeconds": 30, + "volumes": [ + { + "configMap": { + "defaultMode": 420, + "name": "redis-ephemeral-config" + }, + "name": "redis-ephemeral-config" + } + ] + } + }, + "updateStrategy": { + "rollingUpdate": { + "partition": 0 + }, + "type": "RollingUpdate" + } + }, + "status": { + "availableReplicas": 1, + "collisionCount": 0, + "currentReplicas": 1, + "currentRevision": "redis-ephemeral-784f45f78f", + "observedGeneration": 1, + "readyReplicas": 1, + "replicas": 1, + "updateRevision": "redis-ephemeral-784f45f78f", + "updatedReplicas": 1 + } +} \ No newline at end of file diff --git a/components/processors/observek8sattributesprocessor/types.go b/components/processors/observek8sattributesprocessor/types.go index 38bfa10c3..853f6ca02 100644 --- a/components/processors/observek8sattributesprocessor/types.go +++ b/components/processors/observek8sattributesprocessor/types.go @@ -3,7 +3,8 @@ package observek8sattributesprocessor import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -31,10 +32,10 @@ type K8sEventProcessorAction interface { } type podAction interface { - ComputeAttributes(v1.Pod) (attributes, error) + ComputeAttributes(corev1.Pod) (attributes, error) } type nodeAction interface { - ComputeAttributes(v1.Node) (attributes, error) + ComputeAttributes(corev1.Node) (attributes, error) } type jobAction interface { ComputeAttributes(batchv1.Job) (attributes, error) @@ -45,12 +46,30 @@ type cronJobAction interface { type daemonSetAction interface { ComputeAttributes(appsv1.DaemonSet) (attributes, error) } +type statefulSetAction interface { + ComputeAttributes(appsv1.StatefulSet) (attributes, error) +} +type persistentVolumeClaimAction interface { + ComputeAttributes(corev1.PersistentVolumeClaim) (attributes, error) +} +type persistentVolumeAction interface { + ComputeAttributes(corev1.PersistentVolume) (attributes, error) +} +type ingressAction interface { + ComputeAttributes(netv1.Ingress) (attributes, error) +} +type serviceAccountAction interface { + ComputeAttributes(corev1.ServiceAccount) (attributes, error) +} +type endpointsAction interface { + ComputeAttributes(corev1.Endpoints) (attributes, error) +} func (proc *K8sEventsProcessor) RunActions(obj metav1.Object) (attributes, error) { switch typed := obj.(type) { - case *v1.Pod: + case *corev1.Pod: return proc.runPodActions(*typed) - case *v1.Node: + case *corev1.Node: return proc.runNodeActions(*typed) case *batchv1.Job: return proc.runJobActions(*typed) @@ -58,11 +77,23 @@ func (proc *K8sEventsProcessor) RunActions(obj metav1.Object) (attributes, error return proc.runCronJobActions(*typed) case *appsv1.DaemonSet: return proc.runDaemonSetActions(*typed) + case *appsv1.StatefulSet: + return proc.runStatefulSetActions(*typed) + case *corev1.PersistentVolume: + return proc.runPersistentVolumeActions(*typed) + case *corev1.PersistentVolumeClaim: + return proc.runPersistentVolumeClaimActions(*typed) + case *netv1.Ingress: + return proc.runIngressActions(*typed) + case *corev1.ServiceAccount: + return proc.runServiceAccountActions(*typed) + case *corev1.Endpoints: + return proc.runEndpointsActions(*typed) } return attributes{}, nil } -func (m *K8sEventsProcessor) runPodActions(pod v1.Pod) (attributes, error) { +func (m *K8sEventsProcessor) runPodActions(pod corev1.Pod) (attributes, error) { res := attributes{} for _, action := range m.podActions { atts, err := action.ComputeAttributes(pod) @@ -74,7 +105,7 @@ func (m *K8sEventsProcessor) runPodActions(pod v1.Pod) (attributes, error) { return res, nil } -func (m *K8sEventsProcessor) runNodeActions(node v1.Node) (attributes, error) { +func (m *K8sEventsProcessor) runNodeActions(node corev1.Node) (attributes, error) { res := attributes{} for _, action := range m.nodeActions { atts, err := action.ComputeAttributes(node) @@ -114,6 +145,18 @@ func (m *K8sEventsProcessor) runCronJobActions(cronJob batchv1.CronJob) (attribu return res, nil } +func (m *K8sEventsProcessor) runStatefulSetActions(statefulset appsv1.StatefulSet) (attributes, error) { + res := attributes{} + for _, action := range m.statefulSetActions { + atts, err := action.ComputeAttributes(statefulset) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + func (m *K8sEventsProcessor) runDaemonSetActions(daemonset appsv1.DaemonSet) (attributes, error) { res := attributes{} for _, action := range m.daemonSetActions { @@ -125,3 +168,63 @@ func (m *K8sEventsProcessor) runDaemonSetActions(daemonset appsv1.DaemonSet) (at } return res, nil } + +func (m *K8sEventsProcessor) runPersistentVolumeActions(pvc corev1.PersistentVolume) (attributes, error) { + res := attributes{} + for _, action := range m.persistentVolumeActions { + atts, err := action.ComputeAttributes(pvc) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + +func (m *K8sEventsProcessor) runPersistentVolumeClaimActions(pvc corev1.PersistentVolumeClaim) (attributes, error) { + res := attributes{} + for _, action := range m.persistentVolumeClaimActions { + atts, err := action.ComputeAttributes(pvc) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + +func (m *K8sEventsProcessor) runIngressActions(ingress netv1.Ingress) (attributes, error) { + res := attributes{} + for _, action := range m.ingressActions { + atts, err := action.ComputeAttributes(ingress) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + +func (m *K8sEventsProcessor) runServiceAccountActions(serviceAccount corev1.ServiceAccount) (attributes, error) { + res := attributes{} + for _, action := range m.serviceAccountActions { + atts, err := action.ComputeAttributes(serviceAccount) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +} + +func (m *K8sEventsProcessor) runEndpointsActions(endpoints corev1.Endpoints) (attributes, error) { + res := attributes{} + for _, action := range m.endpointsActions { + atts, err := action.ComputeAttributes(endpoints) + if err != nil { + return res, err + } + res.addAttributes(atts) + } + return res, nil +}