Skip to content

Commit 9bb3262

Browse files
OB-34558 Add Pod "counts" facets
Add a single custom processor that generates 3 facets: - restarts - ready_containers - total_containers The reasoning behind a single-processor for multiple facets is that all of these 3 facets are computed by iterating the same slice. Computing them in 3 different actions means iterating the same slice 3 times. This reduces overhead at the cost of making a simplification in the data types that can be generated by custom processors: a processor that returns a map of type "map[string]any" will NOT put a map under its action key, but rather unwrap the map and put all the key-value pairs directly into the facets.
1 parent 98abc0a commit 9bb3262

File tree

9 files changed

+263
-80
lines changed

9 files changed

+263
-80
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterNodeEvents(event K8sEvent) bool {
4+
return event.Kind == "Node"
5+
}

components/processors/observek8sattributesprocessor/noderoles.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ const (
2020
)
2121

2222
var NodeRolesAction = K8sEventProcessorAction{
23-
Key: NodeRolesAttributeKey,
24-
ValueFn: getNodeRoles,
23+
Key: NodeRolesAttributeKey,
24+
Transform: getNodeRoles,
2525
// Reuse the function to filter events for nodes
2626
FilterFn: filterNodeEvents,
2727
}
@@ -47,5 +47,9 @@ func getNodeRoles(objLog plog.LogRecord) any {
4747
}
4848
}
4949

50-
return roles.List()
50+
ret := make([]any, 0, roles.Len())
51+
for _, role := range roles.List() {
52+
ret = append(ret, role)
53+
}
54+
return ret
5155
}

components/processors/observek8sattributesprocessor/nodestatus.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ const (
1212
)
1313

1414
var NodeStatusAction = K8sEventProcessorAction{
15-
Key: NodeStatusAttributeKey,
16-
ValueFn: getNodeStatus,
17-
FilterFn: filterNodeEvents,
18-
}
19-
20-
func filterNodeEvents(event K8sEvent) bool {
21-
return event.Kind == "Node"
15+
Key: NodeStatusAttributeKey,
16+
Transform: getNodeStatus,
17+
FilterFn: filterNodeEvents,
2218
}
2319

2420
// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterPodEvents(event K8sEvent) bool {
4+
return event.Kind == "Pod"
5+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
6+
"go.opentelemetry.io/collector/pdata/plog"
7+
v1 "k8s.io/api/core/v1"
8+
)
9+
10+
const (
11+
// This action will be ignored and not written in any of the facets, since
12+
// we return map[string]any
13+
PodCountsActionKey = "pod_counts"
14+
PodContainerRestartsAttributeKey = "restarts"
15+
PodTotalContainersAttributeKey = "total_containers"
16+
PodReadyContainersAttributeKey = "ready_containers"
17+
)
18+
19+
// This action computes various facets for Pod by aggregating "status" values
20+
// across all containers of a Pod.
21+
//
22+
// We compute more facets into a single action to avoid iterating over the
23+
// same slice multiple times in different actions.
24+
var PodContainersCountsAction = K8sEventProcessorAction{
25+
Key: PodCountsActionKey,
26+
Transform: getPodCounts,
27+
FilterFn: filterPodEvents,
28+
}
29+
30+
func getPodCounts(objLog plog.LogRecord) any {
31+
var p v1.Pod
32+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
33+
if err != nil {
34+
return "Unknown"
35+
}
36+
// we use int32 since containerStatuses.restartCount is int32
37+
var restartsCount int32
38+
// We don't need to use a hash set on the container ID for these two facets,
39+
// since the containerStatuses contain one entry per container.
40+
var readyContainers int64
41+
var allContainers int64
42+
43+
for _, stat := range p.Status.ContainerStatuses {
44+
restartsCount += stat.RestartCount
45+
allContainers++
46+
if stat.Ready {
47+
readyContainers++
48+
}
49+
}
50+
51+
// Returning map[string]any will make the processor add its elements as
52+
// separate facets, rather than adding the whole map under the key of this action
53+
return map[string]any{
54+
PodContainerRestartsAttributeKey: int64(restartsCount),
55+
PodTotalContainersAttributeKey: allContainers,
56+
PodReadyContainersAttributeKey: readyContainers,
57+
}
58+
}

components/processors/observek8sattributesprocessor/podstatus.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,10 @@ const (
1414
nodeUnreachablePodReason = "NodeLost"
1515
)
1616

17-
type OTELKubernetesEvent struct {
18-
Object v1.Pod `json:"object"`
19-
}
20-
2117
var PodStatusAction = K8sEventProcessorAction{
22-
Key: PodStatusAttributeKey,
23-
ValueFn: getPodStatus,
24-
FilterFn: filterPodEvents,
25-
}
26-
27-
func filterPodEvents(event K8sEvent) bool {
28-
return event.Kind == "Pod"
18+
Key: PodStatusAttributeKey,
19+
Transform: getPodStatus,
20+
FilterFn: filterPodEvents,
2921
}
3022

3123
// Generates the Pod "status" facet. Assumes that objLog is a log from a Pod event.

components/processors/observek8sattributesprocessor/processor.go

Lines changed: 82 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package observek8sattributesprocessor
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67

78
"go.opentelemetry.io/collector/component"
89
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -21,18 +22,20 @@ type K8sEventsProcessor struct {
2122
actions []K8sEventProcessorAction
2223
}
2324

25+
type transformFn func(plog.LogRecord) any
26+
2427
type K8sEventProcessorAction struct {
25-
Key string
26-
ValueFn func(plog.LogRecord) any
27-
FilterFn func(K8sEvent) bool
28+
Key string
29+
Transform transformFn
30+
FilterFn func(K8sEvent) bool
2831
}
2932

3033
func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsProcessor {
3134
return &K8sEventsProcessor{
3235
cfg: cfg,
3336
logger: logger,
3437
actions: []K8sEventProcessorAction{
35-
PodStatusAction, NodeStatusAction, NodeRolesAction,
38+
PodStatusAction, NodeStatusAction, NodeRolesAction, PodContainersCountsAction,
3639
},
3740
}
3841
}
@@ -81,47 +84,85 @@ func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (p
8184
facetsMap = transformMap.PutEmptyMap("facets")
8285
}
8386

84-
// This is where the custom processor actually computes the value
85-
value := action.ValueFn(lr)
86-
87-
// TODO [eg] we probably want to make this more modular. For
88-
// now using FromRaw for complex types is fine, since we
89-
// don't plan to generate arbitrarily complex/nested facets.
90-
// For some time coming we will produce facets that are at
91-
// most slices of simple types of map of simple types,
92-
// nothing beyond that.
93-
switch typed := value.(type) {
94-
case string:
95-
facetsMap.PutStr(action.Key, typed)
96-
case int64:
97-
facetsMap.PutInt(action.Key, typed)
98-
case bool:
99-
facetsMap.PutBool(action.Key, typed)
100-
case float64:
101-
facetsMap.PutDouble(action.Key, typed)
102-
case []string:
103-
// []string can't fallback to using FromRaw([]any), as
104-
// the default implementation of FromRaw is not smart
105-
// enough to understand that the slice contains all
106-
// string, and inserts them as bytes, instead
107-
slc := facetsMap.PutEmptySlice(action.Key)
108-
slc.EnsureCapacity(len(typed))
109-
for _, str := range typed {
110-
slc.AppendEmpty().SetStr(str)
111-
}
112-
case map[string]string:
113-
// Same reasoning as []string
114-
mp := facetsMap.PutEmptyMap(action.Key)
115-
mp.EnsureCapacity(len(typed))
116-
for k, v := range typed {
117-
mp.PutStr(k, v)
118-
}
119-
default:
120-
kep.logger.Error("sending the generated facet to Observe in bytes since no custom serialization logic is implemented", zap.Error(err))
87+
// This is where the custom processor actually computes the transformed value(s)
88+
value := action.Transform(lr)
89+
if err := mapPut(facetsMap, action.Key, value); err != nil {
90+
kep.logger.Error(err.Error())
12191
}
12292
}
12393
}
12494
}
12595
}
12696
return logs, nil
12797
}
98+
99+
func slicePut(theSlice pcommon.Slice, value any) error {
100+
elem := theSlice.AppendEmpty()
101+
switch typed := value.(type) {
102+
case string:
103+
elem.SetStr(typed)
104+
case int64:
105+
elem.SetInt(typed)
106+
case bool:
107+
elem.SetBool(typed)
108+
case float64:
109+
elem.SetDouble(typed)
110+
// Let's not complicate things and avoid putting maps/slices into slices There's
111+
// gotta be an easier way to model the processor's output to avoid this
112+
default:
113+
return errors.New("sending the generated facet to Observe in bytes since no custom serialization logic is implemented")
114+
}
115+
116+
return nil
117+
}
118+
119+
// puts "anything" into a map, with some assumptions and intentional
120+
// limitations:
121+
//
122+
// - No nested maps: can only pass maps with signature map[string]any as
123+
// value. In this case, we will unwrap the inner map and put all elements of
124+
// "value" as single elements directly into theMap.
125+
//
126+
// - No nested slices: can only put "base types" inside slices (although
127+
// elements of a slice can be of different [base] types).
128+
//
129+
// - Not all "base types" are covered. For instance, numbers are only int64 and float64.
130+
func mapPut(theMap pcommon.Map, key string, value any) error {
131+
switch typed := value.(type) {
132+
case string:
133+
theMap.PutStr(key, typed)
134+
case int64:
135+
theMap.PutInt(key, typed)
136+
case bool:
137+
theMap.PutBool(key, typed)
138+
case float64:
139+
theMap.PutDouble(key, typed)
140+
case []any:
141+
// []string can't fallback to using FromRaw([]any), as
142+
// the default implementation of FromRaw is not smart
143+
// enough to understand that the slice contains all
144+
// string, and inserts them as bytes, instead
145+
slc := theMap.PutEmptySlice(key)
146+
slc.EnsureCapacity(len(typed))
147+
for _, elem := range typed {
148+
slicePut(slc, elem)
149+
}
150+
case map[string]any:
151+
// For now, we treat this as a special case to allow single transforms
152+
// generating multiple facets with a single function (most likely
153+
// because they are trivial to compute and to avoid iterating over the
154+
// same array more than once, see Node restarts, total_containers,
155+
// ready_containers). This means that we IGNORE the "key" associated
156+
// with the whole map and insert the contents of the map directly into
157+
// theMap.
158+
theMap.EnsureCapacity(theMap.Len() + len(typed))
159+
for k, v := range typed {
160+
mapPut(theMap, k, v)
161+
}
162+
default:
163+
return errors.New("sending the generated facet to Observe in bytes since no custom serialization logic is implemented")
164+
}
165+
166+
return nil
167+
168+
}

components/processors/observek8sattributesprocessor/processor_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func TestK8sEventsProcessor(t *testing.T) {
4545
name: "noObserveTransformAttributes",
4646
inLogs: createResourceLogs(
4747
logWithResource{
48-
logName: "noObserveTransformAttributes",
4948
testBodyFilepath: "./testdata/podObjectEvent.json",
5049
},
5150
),
@@ -57,7 +56,6 @@ func TestK8sEventsProcessor(t *testing.T) {
5756
name: "existingObserveTransformAttributes",
5857
inLogs: createResourceLogs(
5958
logWithResource{
60-
logName: "existingObserveTransformAttributes",
6159
testBodyFilepath: "./testdata/podObjectEvent.json",
6260
recordAttributes: map[string]any{
6361
"observe_transform": map[string]interface{}{
@@ -100,6 +98,19 @@ func TestK8sEventsProcessor(t *testing.T) {
10098
{"observe_transform.facets.roles", []any{"anotherRole!", "control-plane"}},
10199
},
102100
},
101+
{
102+
name: "Pod container counts",
103+
inLogs: createResourceLogs(
104+
logWithResource{
105+
testBodyFilepath: "./testdata/podObjectEvent.json",
106+
},
107+
),
108+
expectedResults: []queryWithResult{
109+
{"observe_transform.facets.restarts", int64(5)},
110+
{"observe_transform.facets.total_containers", int64(4)},
111+
{"observe_transform.facets.ready_containers", int64(3)},
112+
},
113+
},
103114
} {
104115
t.Run(test.name, func(t *testing.T) {
105116
kep := newK8sEventsProcessor(zap.NewNop(), &Config{})

0 commit comments

Comments
 (0)