Skip to content

Commit 0099f34

Browse files
feat: Add k8s Node and Pod facets with custom processors (#74)
New facets for Node: status, roles New facets for Pod: restarts, ready_containers, total_containers, readinessGatesReady, readinessGatesTotal Rewrite the testing framework a bit to: - Use jmespath to query only specific fields of interest, instead of manually creating json-like objects in go to exactly match the whole output. - Simplify the process of adding new tests for new processors
1 parent c1d1d68 commit 0099f34

14 files changed

+3631
-121
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterNodeEvents(event K8sEvent) bool {
4+
return event.Kind == "Node"
5+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
7+
"strings"
8+
9+
"go.opentelemetry.io/collector/pdata/plog"
10+
v1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/util/sets"
12+
)
13+
14+
const (
15+
NodeRolesAttributeKey = "roles"
16+
// labelNodeRolePrefix is a label prefix for node roles
17+
labelNodeRolePrefix = "node-role.kubernetes.io/"
18+
19+
// nodeLabelRole specifies the role of a node
20+
nodeLabelRole = "kubernetes.io/role"
21+
)
22+
23+
var NodeRolesAction = K8sEventProcessorAction{
24+
ComputeAttributes: getNodeRoles,
25+
// Reuse the function to filter events for nodes
26+
FilterFn: filterNodeEvents,
27+
}
28+
29+
// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
30+
func getNodeRoles(objLog plog.LogRecord) (attributes, error) {
31+
var node v1.Node
32+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &node)
33+
if err != nil {
34+
return nil, errors.New("could not unmarshal Node")
35+
}
36+
// based on https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/pkg/printers/internalversion/printers.go#L183https://github.com/kubernetes/kubernetes/blob/1e12d92a5179dbfeb455c79dbf9120c8536e5f9c/pkg/printers/internalversion/printers.go#L14875
37+
roles := sets.NewString()
38+
for k, v := range node.Labels {
39+
switch {
40+
// The role could be in the key and not in the value
41+
case strings.HasPrefix(k, labelNodeRolePrefix):
42+
if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 {
43+
roles.Insert(role)
44+
}
45+
case k == nodeLabelRole && v != "":
46+
roles.Insert(v)
47+
}
48+
}
49+
50+
ret := make([]any, 0, roles.Len())
51+
for _, role := range roles.List() {
52+
ret = append(ret, role)
53+
}
54+
return attributes{NodeRolesAttributeKey: ret}, nil
55+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
7+
"go.opentelemetry.io/collector/pdata/plog"
8+
apiv1 "k8s.io/api/core/v1"
9+
)
10+
11+
const (
12+
NodeStatusAttributeKey = "status"
13+
)
14+
15+
var NodeStatusAction = K8sEventProcessorAction{
16+
ComputeAttributes: getNodeStatus,
17+
FilterFn: filterNodeEvents,
18+
}
19+
20+
// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
21+
func getNodeStatus(objLog plog.LogRecord) (attributes, error) {
22+
var n apiv1.Node
23+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &n)
24+
if err != nil {
25+
return nil, errors.New("could not unmarshal Node")
26+
}
27+
// based on https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/pkg/printers/internalversion/printers.go#L1835
28+
// Although with a simplified logic that is faster to compute and uses less memory
29+
var status string
30+
// For now, we only care about "Ready"/"Not Ready", that's why we simplify the logic
31+
for _, condition := range n.Status.Conditions {
32+
if condition.Type != apiv1.NodeReady {
33+
continue
34+
}
35+
status = string(condition.Type)
36+
if condition.Status == apiv1.ConditionFalse {
37+
status = "Not" + status
38+
}
39+
}
40+
// If there's no Ready condition in the status, use Unknown
41+
if status == "" {
42+
status = "Unknown"
43+
}
44+
if n.Spec.Unschedulable {
45+
status += ", SchedulingDisabled"
46+
}
47+
48+
return attributes{NodeStatusAttributeKey: status}, nil
49+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterPodEvents(event K8sEvent) bool {
4+
return event.Kind == "Pod"
5+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
7+
"go.opentelemetry.io/collector/pdata/plog"
8+
v1 "k8s.io/api/core/v1"
9+
)
10+
11+
const (
12+
// This action will be ignored and not written in any of the facets, since
13+
// we return map[string]any
14+
PodContainerRestartsAttributeKey = "restarts"
15+
PodTotalContainersAttributeKey = "total_containers"
16+
PodReadyContainersAttributeKey = "ready_containers"
17+
)
18+
19+
// This action computes various facets for Pod by aggregating "status" values
20+
// across all containers of a Pod.
21+
//
22+
// We compute more facets into a single action to avoid iterating over the
23+
// same slice multiple times in different actions.
24+
var PodContainersCountsAction = K8sEventProcessorAction{
25+
ComputeAttributes: getPodCounts,
26+
FilterFn: filterPodEvents,
27+
}
28+
29+
func getPodCounts(objLog plog.LogRecord) (attributes, error) {
30+
var p v1.Pod
31+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
32+
if err != nil {
33+
return nil, errors.New("Unknown")
34+
}
35+
// we use int32 since containerStatuses.restartCount is int32
36+
var restartsCount int32
37+
// We don't need to use a hash set on the container ID for these two facets,
38+
// since the containerStatuses contain one entry per container.
39+
var readyContainers int64
40+
var allContainers int64
41+
42+
for _, stat := range p.Status.ContainerStatuses {
43+
restartsCount += stat.RestartCount
44+
allContainers++
45+
if stat.Ready {
46+
readyContainers++
47+
}
48+
}
49+
50+
// Returning map[string]any will make the processor add its elements as
51+
// separate facets, rather than adding the whole map under the key of this action
52+
return attributes{
53+
PodContainerRestartsAttributeKey: int64(restartsCount),
54+
PodTotalContainersAttributeKey: allContainers,
55+
PodReadyContainersAttributeKey: readyContainers,
56+
}, nil
57+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
7+
"go.opentelemetry.io/collector/pdata/plog"
8+
v1 "k8s.io/api/core/v1"
9+
)
10+
11+
const (
12+
// This action will be ignored and not written in any of the facets, since
13+
// we return map[string]any
14+
PodReadinessGatesReadyAttributeKey = "readinessGatesReady"
15+
PodReadinessGatesTotalAttributeKey = "readinessGatesTotal"
16+
)
17+
18+
// This action computes various facets for Pod by aggregating "status" values
19+
// across all containers of a Pod.
20+
//
21+
// We compute more facets into a single action to avoid iterating over the
22+
// same slice multiple times in different actions.
23+
var PodReadinessAction = K8sEventProcessorAction{
24+
ComputeAttributes: getPodReadiness,
25+
FilterFn: filterPodEvents,
26+
}
27+
28+
func getPodReadiness(objLog plog.LogRecord) (attributes, error) {
29+
var pod v1.Pod
30+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &pod)
31+
if err != nil {
32+
return nil, errors.New("could not unmarshal Pod")
33+
}
34+
readinessGatesReady := 0
35+
36+
if len(pod.Spec.ReadinessGates) > 0 {
37+
for _, readinessGate := range pod.Spec.ReadinessGates {
38+
conditionType := readinessGate.ConditionType
39+
for _, condition := range pod.Status.Conditions {
40+
if condition.Type == conditionType {
41+
if condition.Status == v1.ConditionTrue {
42+
readinessGatesReady++
43+
}
44+
break
45+
}
46+
}
47+
}
48+
}
49+
50+
return attributes{
51+
PodReadinessGatesTotalAttributeKey: len(pod.Spec.ReadinessGates),
52+
PodReadinessGatesReadyAttributeKey: readinessGatesReady,
53+
}, nil
54+
}

components/processors/observek8sattributesprocessor/podstatus.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package observek8sattributesprocessor
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
67

78
"go.opentelemetry.io/collector/pdata/plog"
@@ -14,25 +15,17 @@ const (
1415
nodeUnreachablePodReason = "NodeLost"
1516
)
1617

17-
type OTELKubernetesEvent struct {
18-
Object v1.Pod `json:"object"`
19-
}
20-
2118
var PodStatusAction = K8sEventProcessorAction{
22-
Key: PodStatusAttributeKey,
23-
ValueFn: getStatus,
24-
FilterFn: filterFn,
25-
}
26-
27-
func filterFn(event K8sEvent) bool {
28-
return event.Kind == "Pod"
19+
ComputeAttributes: getPodStatus,
20+
FilterFn: filterPodEvents,
2921
}
3022

31-
func getStatus(objLog plog.LogRecord) string {
23+
// Generates the Pod "status" facet. Assumes that objLog is a log from a Pod event.
24+
func getPodStatus(objLog plog.LogRecord) (attributes, error) {
3225
var p v1.Pod
3326
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
3427
if err != nil {
35-
return "Unknown"
28+
return nil, errors.New("Unknown")
3629
}
3730
// based on https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L901
3831
reason := string(p.Status.Phase)
@@ -99,5 +92,5 @@ func getStatus(objLog plog.LogRecord) string {
9992
reason = "Terminating"
10093
}
10194

102-
return reason
95+
return attributes{PodStatusAttributeKey: reason}, nil
10396
}

0 commit comments

Comments
 (0)