Skip to content

feat: Add processor actions for new resources #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ func NewDaemonsetSelectorAction() DaemonSetSelectorAction {
return DaemonSetSelectorAction{}
}

// ---------------------------------- Daemonset "selector" ----------------------------------
// ---------------------------------- DaemonSet "selector" ----------------------------------

// Generates the Daemonset "status" facet. Same logic as kubectl printer
// https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1204
// Generates the Daemonset "selector" facet.
func (DaemonSetSelectorAction) ComputeAttributes(daemonset appsv1.DaemonSet) (attributes, error) {
selecotString := metav1.FormatLabelSelector(daemonset.Spec.Selector)
return attributes{DaemonsetSelectorAttributeKey: selecotString}, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package observek8sattributesprocessor

import (
"net"
"strconv"

corev1 "k8s.io/api/core/v1"
)

const (
EnpdointsAttributeKey = "endpoints"
)

// ---------------------------------- Endpoints "endpoints" ----------------------------------

type EndpointsStatusAction struct{}

func NewEndpointsStatusAction() EndpointsStatusAction {
return EndpointsStatusAction{}
}

// Generates the Endpoints "endpoints" facet, which is a list of all individual endpoints, encoded as strings
func (EndpointsStatusAction) ComputeAttributes(endpoints corev1.Endpoints) (attributes, error) {
list := []string{}
for _, ss := range endpoints.Subsets {
if len(ss.Ports) == 0 {
// It's possible to have headless services with no ports.
for i := range ss.Addresses {
list = append(list, ss.Addresses[i].IP)
}
// avoid nesting code too deeply
continue
}

// "Normal" services with ports defined.
for _, port := range ss.Ports {
for i := range ss.Addresses {
addr := &ss.Addresses[i]
hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))
list = append(list, hostPort)
}
}
}
return attributes{EnpdointsAttributeKey: list}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package observek8sattributesprocessor

import "testing"

func TestEndpointsActions(t *testing.T) {
for _, testCase := range []k8sEventProcessorTest{
{
name: "Endpoints",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/endpointsEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.endpoints", []any{"10.244.0.53:5432"}},
},
},
} {
runTest(t, testCase)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package observek8sattributesprocessor

import (
"strings"

netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

const (
IngressRulesAttributeKey = "rules"
IngressLoadBalancerAttributeKey = "loadBalancer"
)

// Adapted from https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1373
func formatIngressRules(rules []netv1.IngressRule) string {
list := []string{}
for _, rule := range rules {
list = append(list, rule.Host)
}
if len(list) == 0 {
return "*"
}
ret := strings.Join(list, ",")
return ret
}

// ---------------------------------- Ingress "rules" ----------------------------------
type IngressRulesAction struct{}

func NewIngressRulesAction() IngressRulesAction {
return IngressRulesAction{}
}

// Generates the Ingress "rules" facet.
func (IngressRulesAction) ComputeAttributes(ingress netv1.Ingress) (attributes, error) {
return attributes{IngressRulesAttributeKey: formatIngressRules(ingress.Spec.Rules)}, nil
}

// ---------------------------------- Ingress "loadBalancer" ----------------------------------
type IngressLoadBalancerAction struct{}

func NewIngressLoadBalancerAction() IngressLoadBalancerAction {
return IngressLoadBalancerAction{}
}

// Adapted from https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L1420
// (removed wide option and always extract full info)
func ingressLoadBalancerStatusStringer(s netv1.IngressLoadBalancerStatus) string {
ingress := s.Ingress
result := sets.NewString()
for i := range ingress {
if ingress[i].IP != "" {
result.Insert(ingress[i].IP)
} else if ingress[i].Hostname != "" {
result.Insert(ingress[i].Hostname)
}
}

r := strings.Join(result.List(), ",")
return r
}

// Generates the Ingress "loadBalancer" facet.
func (IngressLoadBalancerAction) ComputeAttributes(ingress netv1.Ingress) (attributes, error) {
return attributes{IngressLoadBalancerAttributeKey: ingressLoadBalancerStatusStringer(ingress.Status.LoadBalancer)}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package observek8sattributesprocessor

import "testing"

func TestIngressActions(t *testing.T) {
for _, testCase := range []k8sEventProcessorTest{
{
name: "Ingress rules",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/ingressEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.rules", "prometheus.observe-eng.com"},
},
},
{
name: "Ingress rules",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/ingressEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.loadBalancer", "someUniqueElbIdentifier.elb.us-west-2.amazonaws.com"},
},
},
} {
runTest(t, testCase)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,5 @@ func (NodeRolesAction) ComputeAttributes(node v1.Node) (attributes, error) {
}
}

ret := make([]any, 0, roles.Len())
for _, role := range roles.List() {
ret = append(ret, role)
}
return attributes{NodeRolesAttributeKey: ret}, nil
return attributes{NodeRolesAttributeKey: roles.List()}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package observek8sattributesprocessor

import (
corev1 "k8s.io/api/core/v1"
)

const (
PersistentVolumeTypeAttributeKey = "volumeType"
)

type PersistentVolumeTypeAction struct{}

func NewPersistentVolumeTypeAction() PersistentVolumeTypeAction {
return PersistentVolumeTypeAction{}
}

// Generates the PersistentVolume "type" facet.
func (PersistentVolumeTypeAction) ComputeAttributes(pvc corev1.PersistentVolume) (attributes, error) {
spec := pvc.Spec.PersistentVolumeSource
var persistentVolumeType string
switch {
case spec.GCEPersistentDisk != nil:
persistentVolumeType = "GCEPersistentDisk"
case spec.AWSElasticBlockStore != nil:
persistentVolumeType = "AWSElasticBlockStore"
case spec.HostPath != nil:
persistentVolumeType = "HostPath"
case spec.Glusterfs != nil:
persistentVolumeType = "Glusterfs"
case spec.NFS != nil:
persistentVolumeType = "NFS"
case spec.RBD != nil:
persistentVolumeType = "RBD"
case spec.ISCSI != nil:
persistentVolumeType = "ISCSI"
case spec.Cinder != nil:
persistentVolumeType = "Cinder"
case spec.CephFS != nil:
persistentVolumeType = "CephFS"
case spec.FC != nil:
persistentVolumeType = "FC"
case spec.Flocker != nil:
persistentVolumeType = "Flocker"
case spec.FlexVolume != nil:
persistentVolumeType = "FlexVolume"
case spec.AzureFile != nil:
persistentVolumeType = "AzureFile"
case spec.VsphereVolume != nil:
persistentVolumeType = "VsphereVolume"
case spec.Quobyte != nil:
persistentVolumeType = "Quobyte"
case spec.AzureDisk != nil:
persistentVolumeType = "AzureDisk"
case spec.PhotonPersistentDisk != nil:
persistentVolumeType = "PhotonPersistentDisk"
case spec.PortworxVolume != nil:
persistentVolumeType = "PortworxVolume"
case spec.ScaleIO != nil:
persistentVolumeType = "ScaleIO"
case spec.Local != nil:
persistentVolumeType = "Local"
case spec.StorageOS != nil:
persistentVolumeType = "StorageOS"
case spec.CSI != nil:
persistentVolumeType = "CSI"
default:
// This should never happen, since exactly one of the above should be set
persistentVolumeType = "Unknown"
}
return attributes{PersistentVolumeTypeAttributeKey: persistentVolumeType}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package observek8sattributesprocessor

import "testing"

func TestPersistentVolumeActions(t *testing.T) {
for _, testCase := range []k8sEventProcessorTest{
{
name: "Extract PersistentVolume type (AWSElasticBlockStore)",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeAWSElasticBlockStoreEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.volumeType", "AWSElasticBlockStore"},
},
},
{
name: "Extract PersistentVolume type (HostPath)",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeHostPathEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.volumeType", "HostPath"},
},
},
} {
runTest(t, testCase)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package observek8sattributesprocessor

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
PersistentVolumeClaimSelectorAttributeKey = "selector"
)

type PersistentVolumeClaimSelectorAction struct{}

func NewPersistentVolumeClaimSelectorAction() PersistentVolumeClaimSelectorAction {
return PersistentVolumeClaimSelectorAction{}
}

// Generates the PersistentVolumeClaim "selector" facet.
func (PersistentVolumeClaimSelectorAction) ComputeAttributes(pvc corev1.PersistentVolumeClaim) (attributes, error) {
selecotString := metav1.FormatLabelSelector(pvc.Spec.Selector)
return attributes{PersistentVolumeClaimSelectorAttributeKey: selecotString}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package observek8sattributesprocessor

import "testing"

func TestPersistentVolumeClaimActions(t *testing.T) {
for _, testCase := range []k8sEventProcessorTest{
{
name: "Pretty print of a PersistentVolumeClaim's selector",
inLogs: resourceLogsFromSingleJsonEvent("./testdata/persistentVolumeClaimEvent.json"),
expectedResults: []queryWithResult{
{"observe_transform.facets.selector", "environment in (production,staging),storage-tier=high-performance"},
},
},
} {
runTest(t, testCase)
}
}
Loading
Loading