Skip to content

Commit 9f8ada5

Browse files
OB-34558 Add Pod "counts" facets
Add a single custom processor that generates 3 facets: - restarts - ready_containers - total_containers The reasoning behind a single-processor for multiple facets is that all of these 3 facets are computed by iterating the same slice. Computing them in 3 different actions means iterating the same slice 3 times.
1 parent 98abc0a commit 9f8ada5

File tree

9 files changed

+279
-78
lines changed

9 files changed

+279
-78
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterNodeEvents(event K8sEvent) bool {
4+
return event.Kind == "Node"
5+
}

components/processors/observek8sattributesprocessor/noderoles.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ const (
2020
)
2121

2222
var NodeRolesAction = K8sEventProcessorAction{
23-
Key: NodeRolesAttributeKey,
24-
ValueFn: getNodeRoles,
23+
Key: NodeRolesAttributeKey,
24+
Transform: getNodeRoles,
2525
// Reuse the function to filter events for nodes
2626
FilterFn: filterNodeEvents,
2727
}
@@ -47,5 +47,9 @@ func getNodeRoles(objLog plog.LogRecord) any {
4747
}
4848
}
4949

50-
return roles.List()
50+
ret := make([]any, 0, roles.Len())
51+
for _, role := range roles.List() {
52+
ret = append(ret, role)
53+
}
54+
return ret
5155
}

components/processors/observek8sattributesprocessor/nodestatus.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ const (
1212
)
1313

1414
var NodeStatusAction = K8sEventProcessorAction{
15-
Key: NodeStatusAttributeKey,
16-
ValueFn: getNodeStatus,
17-
FilterFn: filterNodeEvents,
18-
}
19-
20-
func filterNodeEvents(event K8sEvent) bool {
21-
return event.Kind == "Node"
15+
Key: NodeStatusAttributeKey,
16+
Transform: getNodeStatus,
17+
FilterFn: filterNodeEvents,
2218
}
2319

2420
// Generates the Node "status" facet. Assumes that objLog is a log from a Node event.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package observek8sattributesprocessor
2+
3+
func filterPodEvents(event K8sEvent) bool {
4+
return event.Kind == "Pod"
5+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package observek8sattributesprocessor
2+
3+
import (
4+
"encoding/json"
5+
6+
"go.opentelemetry.io/collector/pdata/plog"
7+
v1 "k8s.io/api/core/v1"
8+
)
9+
10+
const (
11+
// This action will be ignored and not written in any of the facets, since
12+
// we return map[string]any
13+
PodCountsActionKey = "pod_counts"
14+
PodContainerRestartsAttributeKey = "restarts"
15+
PodTotalContainersAttributeKey = "total_containers"
16+
PodReadyContainersAttributeKey = "ready_containers"
17+
)
18+
19+
// This action computes various facets for Pod by aggregating "status" values
20+
// across all containers of a Pod.
21+
//
22+
// We compute more facets into a single action to avoid iterating over the
23+
// same slice multiple times in different actions.
24+
var PodContainersCountsAction = K8sEventProcessorAction{
25+
Key: PodCountsActionKey,
26+
Transform: getPodCounts,
27+
FilterFn: filterPodEvents,
28+
ComputesMultipleFacets: true,
29+
}
30+
31+
func getPodCounts(objLog plog.LogRecord) any {
32+
var p v1.Pod
33+
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
34+
if err != nil {
35+
return "Unknown"
36+
}
37+
// we use int32 since containerStatuses.restartCount is int32
38+
var restartsCount int32
39+
// We don't need to use a hash set on the container ID for these two facets,
40+
// since the containerStatuses contain one entry per container.
41+
var readyContainers int64
42+
var allContainers int64
43+
44+
for _, stat := range p.Status.ContainerStatuses {
45+
restartsCount += stat.RestartCount
46+
allContainers++
47+
if stat.Ready {
48+
readyContainers++
49+
}
50+
}
51+
52+
// Returning map[string]any will make the processor add its elements as
53+
// separate facets, rather than adding the whole map under the key of this action
54+
return map[string]any{
55+
PodContainerRestartsAttributeKey: int64(restartsCount),
56+
PodTotalContainersAttributeKey: allContainers,
57+
PodReadyContainersAttributeKey: readyContainers,
58+
}
59+
}

components/processors/observek8sattributesprocessor/podstatus.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,10 @@ const (
1414
nodeUnreachablePodReason = "NodeLost"
1515
)
1616

17-
type OTELKubernetesEvent struct {
18-
Object v1.Pod `json:"object"`
19-
}
20-
2117
var PodStatusAction = K8sEventProcessorAction{
22-
Key: PodStatusAttributeKey,
23-
ValueFn: getPodStatus,
24-
FilterFn: filterPodEvents,
25-
}
26-
27-
func filterPodEvents(event K8sEvent) bool {
28-
return event.Kind == "Pod"
18+
Key: PodStatusAttributeKey,
19+
Transform: getPodStatus,
20+
FilterFn: filterPodEvents,
2921
}
3022

3123
// Generates the Pod "status" facet. Assumes that objLog is a log from a Pod event.

components/processors/observek8sattributesprocessor/processor.go

Lines changed: 97 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package observek8sattributesprocessor
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67

78
"go.opentelemetry.io/collector/component"
89
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -21,18 +22,21 @@ type K8sEventsProcessor struct {
2122
actions []K8sEventProcessorAction
2223
}
2324

25+
type transformFn func(plog.LogRecord) any
26+
2427
type K8sEventProcessorAction struct {
25-
Key string
26-
ValueFn func(plog.LogRecord) any
27-
FilterFn func(K8sEvent) bool
28+
Key string
29+
Transform transformFn
30+
FilterFn func(K8sEvent) bool
31+
ComputesMultipleFacets bool
2832
}
2933

3034
func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsProcessor {
3135
return &K8sEventsProcessor{
3236
cfg: cfg,
3337
logger: logger,
3438
actions: []K8sEventProcessorAction{
35-
PodStatusAction, NodeStatusAction, NodeRolesAction,
39+
PodStatusAction, NodeStatusAction, NodeRolesAction, PodContainersCountsAction,
3640
},
3741
}
3842
}
@@ -79,49 +83,103 @@ func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (p
7983
facetsMap = facets.Map()
8084
} else {
8185
facetsMap = transformMap.PutEmptyMap("facets")
86+
// Make sure we have capacity for at least as many actions as we have defined
87+
// Actions could generate more than one facet, that's taken care of afterwards.
88+
facetsMap.EnsureCapacity(len(kep.actions))
8289
}
8390

84-
// This is where the custom processor actually computes the value
85-
value := action.ValueFn(lr)
86-
87-
// TODO [eg] we probably want to make this more modular. For
88-
// now using FromRaw for complex types is fine, since we
89-
// don't plan to generate arbitrarily complex/nested facets.
90-
// For some time coming we will produce facets that are at
91-
// most slices of simple types of map of simple types,
92-
// nothing beyond that.
93-
switch typed := value.(type) {
94-
case string:
95-
facetsMap.PutStr(action.Key, typed)
96-
case int64:
97-
facetsMap.PutInt(action.Key, typed)
98-
case bool:
99-
facetsMap.PutBool(action.Key, typed)
100-
case float64:
101-
facetsMap.PutDouble(action.Key, typed)
102-
case []string:
103-
// []string can't fallback to using FromRaw([]any), as
104-
// the default implementation of FromRaw is not smart
105-
// enough to understand that the slice contains all
106-
// string, and inserts them as bytes, instead
107-
slc := facetsMap.PutEmptySlice(action.Key)
108-
slc.EnsureCapacity(len(typed))
109-
for _, str := range typed {
110-
slc.AppendEmpty().SetStr(str)
91+
// This is where the custom processor actually computes the transformed value(s)
92+
value := action.Transform(lr)
93+
94+
// If the action returns multiple facets, it means that:
95+
// - it must generate a value of type map[string]any
96+
// - we put each individual element of such map into the facets
97+
// - we ignore the key associated with the action itself, since each facet has its own key in the map
98+
if action.ComputesMultipleFacets {
99+
if facets, ok := value.(map[string]any); ok {
100+
// Make sure we have enough capacity for the new facets
101+
facetsMap.EnsureCapacity(facetsMap.Len() + len(facets))
102+
for key, val := range facets {
103+
if err := mapPut(facetsMap, key, val); err != nil {
104+
kep.logger.Error(err.Error())
105+
}
106+
}
107+
} else {
108+
kep.logger.Error("Action %s computes multiple facets and the returned value is not of type map[string]any", zap.String("name", action.Key))
111109
}
112-
case map[string]string:
113-
// Same reasoning as []string
114-
mp := facetsMap.PutEmptyMap(action.Key)
115-
mp.EnsureCapacity(len(typed))
116-
for k, v := range typed {
117-
mp.PutStr(k, v)
110+
} else {
111+
// Otherwise, whatever is generated by the facet is the value of such facet, and the facet's key is the key of the action itself.
112+
// This is true even if the action generates map[string]any when ComputesMultipleFacets == false!
113+
// This way we allow computing facets typed map[string]any
114+
if err := mapPut(facetsMap, action.Key, value); err != nil {
115+
kep.logger.Error(err.Error())
118116
}
119-
default:
120-
kep.logger.Error("sending the generated facet to Observe in bytes since no custom serialization logic is implemented", zap.Error(err))
121117
}
122118
}
123119
}
124120
}
125121
}
126122
return logs, nil
127123
}
124+
125+
func slicePut(theSlice pcommon.Slice, value any) error {
126+
elem := theSlice.AppendEmpty()
127+
switch typed := value.(type) {
128+
case string:
129+
elem.SetStr(typed)
130+
case int64:
131+
elem.SetInt(typed)
132+
case bool:
133+
elem.SetBool(typed)
134+
case float64:
135+
elem.SetDouble(typed)
136+
// Let's not complicate things and avoid putting maps/slices into slices There's
137+
// gotta be an easier way to model the processor's output to avoid this
138+
default:
139+
return errors.New("sending the generated facet to Observe in bytes since no custom serialization logic is implemented")
140+
}
141+
142+
return nil
143+
}
144+
145+
// puts "anything" into a map, with some assumptions and intentional
146+
// limitations:
147+
//
148+
// - No nested slices: can only put "base types" inside slices (although
149+
// elements of a slice can be of different [base] types).
150+
//
151+
// - Not all "base types" are covered. For instance, numbers are only int64 and float64.
152+
//
153+
// - No maps with keys of arbitrary types: only string
154+
func mapPut(theMap pcommon.Map, key string, value any) error {
155+
switch typed := value.(type) {
156+
case string:
157+
theMap.PutStr(key, typed)
158+
case int64:
159+
theMap.PutInt(key, typed)
160+
case bool:
161+
theMap.PutBool(key, typed)
162+
case float64:
163+
theMap.PutDouble(key, typed)
164+
case []any:
165+
slc := theMap.PutEmptySlice(key)
166+
slc.EnsureCapacity(len(typed))
167+
for _, elem := range typed {
168+
slicePut(slc, elem)
169+
}
170+
case map[string]any:
171+
// This is potentially arbitrarily recursive. We don't care about
172+
// checking the nesting level since we will never need to define
173+
// processors with more than one nesting level
174+
new := theMap.PutEmptyMap(key)
175+
new.EnsureCapacity(len(typed))
176+
for k, v := range typed {
177+
mapPut(new, k, v)
178+
}
179+
default:
180+
return errors.New("sending the generated facet to Observe in bytes since no custom serialization logic is implemented")
181+
}
182+
183+
return nil
184+
185+
}

components/processors/observek8sattributesprocessor/processor_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func TestK8sEventsProcessor(t *testing.T) {
4545
name: "noObserveTransformAttributes",
4646
inLogs: createResourceLogs(
4747
logWithResource{
48-
logName: "noObserveTransformAttributes",
4948
testBodyFilepath: "./testdata/podObjectEvent.json",
5049
},
5150
),
@@ -57,7 +56,6 @@ func TestK8sEventsProcessor(t *testing.T) {
5756
name: "existingObserveTransformAttributes",
5857
inLogs: createResourceLogs(
5958
logWithResource{
60-
logName: "existingObserveTransformAttributes",
6159
testBodyFilepath: "./testdata/podObjectEvent.json",
6260
recordAttributes: map[string]any{
6361
"observe_transform": map[string]interface{}{
@@ -100,6 +98,19 @@ func TestK8sEventsProcessor(t *testing.T) {
10098
{"observe_transform.facets.roles", []any{"anotherRole!", "control-plane"}},
10199
},
102100
},
101+
{
102+
name: "Pod container counts",
103+
inLogs: createResourceLogs(
104+
logWithResource{
105+
testBodyFilepath: "./testdata/podObjectEvent.json",
106+
},
107+
),
108+
expectedResults: []queryWithResult{
109+
{"observe_transform.facets.restarts", int64(5)},
110+
{"observe_transform.facets.total_containers", int64(4)},
111+
{"observe_transform.facets.ready_containers", int64(3)},
112+
},
113+
},
103114
} {
104115
t.Run(test.name, func(t *testing.T) {
105116
kep := newK8sEventsProcessor(zap.NewNop(), &Config{})

0 commit comments

Comments
 (0)