Skip to content

OB-34960 Decorate k8s Node facets with custom processors #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions components/processors/observek8sattributesprocessor/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package observek8sattributesprocessor

func filterNodeEvents(event K8sEvent) bool {
return event.Kind == "Node"
}
55 changes: 55 additions & 0 deletions components/processors/observek8sattributesprocessor/noderoles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package observek8sattributesprocessor

import (
"encoding/json"
"errors"

"strings"

"go.opentelemetry.io/collector/pdata/plog"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

const (
NodeRolesAttributeKey = "roles"
// labelNodeRolePrefix is a label prefix for node roles
labelNodeRolePrefix = "node-role.kubernetes.io/"

// nodeLabelRole specifies the role of a node
nodeLabelRole = "kubernetes.io/role"
)

var NodeRolesAction = K8sEventProcessorAction{
ComputeAttributes: getNodeRoles,
// Reuse the function to filter events for nodes
FilterFn: filterNodeEvents,
}

// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
func getNodeRoles(objLog plog.LogRecord) (attributes, error) {
var node v1.Node
err := json.Unmarshal([]byte(objLog.Body().AsString()), &node)
if err != nil {
return nil, errors.New("could not unmarshal Node")
}
// based on https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/pkg/printers/internalversion/printers.go#L183https://github.com/kubernetes/kubernetes/blob/1e12d92a5179dbfeb455c79dbf9120c8536e5f9c/pkg/printers/internalversion/printers.go#L14875
roles := sets.NewString()
for k, v := range node.Labels {
switch {
// The role could be in the key and not in the value
case strings.HasPrefix(k, labelNodeRolePrefix):
if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 {
roles.Insert(role)
}
case k == nodeLabelRole && v != "":
roles.Insert(v)
}
}

ret := make([]any, 0, roles.Len())
for _, role := range roles.List() {
ret = append(ret, role)
}
return attributes{NodeRolesAttributeKey: ret}, nil
}
49 changes: 49 additions & 0 deletions components/processors/observek8sattributesprocessor/nodestatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package observek8sattributesprocessor

import (
"encoding/json"
"errors"

"go.opentelemetry.io/collector/pdata/plog"
apiv1 "k8s.io/api/core/v1"
)

const (
NodeStatusAttributeKey = "status"
)

var NodeStatusAction = K8sEventProcessorAction{
ComputeAttributes: getNodeStatus,
FilterFn: filterNodeEvents,
}

// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
func getNodeStatus(objLog plog.LogRecord) (attributes, error) {
var n apiv1.Node
err := json.Unmarshal([]byte(objLog.Body().AsString()), &n)
if err != nil {
return nil, errors.New("could not unmarshal Node")
}
// based on https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/pkg/printers/internalversion/printers.go#L1835
// Although with a simplified logic that is faster to compute and uses less memory
var status string
// For now, we only care about "Ready"/"Not Ready", that's why we simplify the logic
for _, condition := range n.Status.Conditions {
if condition.Type != apiv1.NodeReady {
continue
}
status = string(condition.Type)
if condition.Status == apiv1.ConditionFalse {
status = "Not" + status
}
}
// If there's no Ready condition in the status, use Unknown
if status == "" {
status = "Unknown"
}
if n.Spec.Unschedulable {
status += ", SchedulingDisabled"
}

return attributes{NodeStatusAttributeKey: status}, nil
}
5 changes: 5 additions & 0 deletions components/processors/observek8sattributesprocessor/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package observek8sattributesprocessor

func filterPodEvents(event K8sEvent) bool {
return event.Kind == "Pod"
}
57 changes: 57 additions & 0 deletions components/processors/observek8sattributesprocessor/podcounts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package observek8sattributesprocessor

import (
"encoding/json"
"errors"

"go.opentelemetry.io/collector/pdata/plog"
v1 "k8s.io/api/core/v1"
)

const (
// This action will be ignored and not written in any of the facets, since
// we return map[string]any
PodContainerRestartsAttributeKey = "restarts"
PodTotalContainersAttributeKey = "total_containers"
PodReadyContainersAttributeKey = "ready_containers"
)

// This action computes various facets for Pod by aggregating "status" values
// across all containers of a Pod.
//
// We compute more facets into a single action to avoid iterating over the
// same slice multiple times in different actions.
var PodContainersCountsAction = K8sEventProcessorAction{
ComputeAttributes: getPodCounts,
FilterFn: filterPodEvents,
}

func getPodCounts(objLog plog.LogRecord) (attributes, error) {
var p v1.Pod
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
if err != nil {
return nil, errors.New("Unknown")
}
// we use int32 since containerStatuses.restartCount is int32
var restartsCount int32
// We don't need to use a hash set on the container ID for these two facets,
// since the containerStatuses contain one entry per container.
var readyContainers int64
var allContainers int64

for _, stat := range p.Status.ContainerStatuses {
restartsCount += stat.RestartCount
allContainers++
if stat.Ready {
readyContainers++
}
}

// Returning map[string]any will make the processor add its elements as
// separate facets, rather than adding the whole map under the key of this action
return attributes{
PodContainerRestartsAttributeKey: int64(restartsCount),
PodTotalContainersAttributeKey: allContainers,
PodReadyContainersAttributeKey: readyContainers,
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package observek8sattributesprocessor

import (
"encoding/json"
"errors"

"go.opentelemetry.io/collector/pdata/plog"
v1 "k8s.io/api/core/v1"
)

const (
// This action will be ignored and not written in any of the facets, since
// we return map[string]any
PodReadinessGatesReadyAttributeKey = "readinessGatesReady"
PodReadinessGatesTotalAttributeKey = "readinessGatesTotal"
)

// This action computes various facets for Pod by aggregating "status" values
// across all containers of a Pod.
//
// We compute more facets into a single action to avoid iterating over the
// same slice multiple times in different actions.
var PodReadinessAction = K8sEventProcessorAction{
ComputeAttributes: getPodReadiness,
FilterFn: filterPodEvents,
}

func getPodReadiness(objLog plog.LogRecord) (attributes, error) {
var pod v1.Pod
err := json.Unmarshal([]byte(objLog.Body().AsString()), &pod)
if err != nil {
return nil, errors.New("could not unmarshal Pod")
}
readinessGatesReady := 0

if len(pod.Spec.ReadinessGates) > 0 {
for _, readinessGate := range pod.Spec.ReadinessGates {
conditionType := readinessGate.ConditionType
for _, condition := range pod.Status.Conditions {
if condition.Type == conditionType {
if condition.Status == v1.ConditionTrue {
readinessGatesReady++
}
break
}
}
}
}

return attributes{
PodReadinessGatesTotalAttributeKey: len(pod.Spec.ReadinessGates),
PodReadinessGatesReadyAttributeKey: readinessGatesReady,
}, nil
}
21 changes: 7 additions & 14 deletions components/processors/observek8sattributesprocessor/podstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package observek8sattributesprocessor

import (
"encoding/json"
"errors"
"fmt"

"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -14,25 +15,17 @@ const (
nodeUnreachablePodReason = "NodeLost"
)

type OTELKubernetesEvent struct {
Object v1.Pod `json:"object"`
}

var PodStatusAction = K8sEventProcessorAction{
Key: PodStatusAttributeKey,
ValueFn: getStatus,
FilterFn: filterFn,
}

func filterFn(event K8sEvent) bool {
return event.Kind == "Pod"
ComputeAttributes: getPodStatus,
FilterFn: filterPodEvents,
}

func getStatus(objLog plog.LogRecord) string {
// Generates the Pod "status" facet. Assumes that objLog is a log from a Pod event.
func getPodStatus(objLog plog.LogRecord) (attributes, error) {
var p v1.Pod
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
if err != nil {
return "Unknown"
return nil, errors.New("Unknown")
}
// based on https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L901
reason := string(p.Status.Phase)
Expand Down Expand Up @@ -99,5 +92,5 @@ func getStatus(objLog plog.LogRecord) string {
reason = "Terminating"
}

return reason
return attributes{PodStatusAttributeKey: reason}, nil
}
Loading
Loading