Skip to content

Commit 9755259

Browse files
fix: Override log body with structured object (#123)
Currently we were modifying the marshalled k8s object and then overriding the log body with its unmarshalled string representation. This doesn't work well when the custom processor is not last in the pipeline, because other processors might have to index into the structured body to be able to extract values. Since we need to move the transform processor after the custom processor in the agent configmap, we want to make sure that this processor leaves the log body as a structured object, rather than a plain string. ### Description OB-XXX Please explain the changes you made here. ### Checklist - [ ] Created tests which fail without the change (if possible) - [ ] Extended the README / documentation, if necessary
1 parent 2016541 commit 9755259

File tree

3 files changed

+49
-17
lines changed

3 files changed

+49
-17
lines changed

components/processors/observek8sattributesprocessor/processor.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,19 +296,24 @@ func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (p
296296

297297
// ALWAYS RUN BODY ACTIONS FIRST
298298
// The attributes should always be computed on the MODIFIED body.
299-
err := kep.RunBodyActions(object)
299+
modified, err := kep.RunBodyActions(object)
300300
if err != nil {
301301
kep.logger.Error("could not run body actions", zap.Error(err))
302302
continue
303303
}
304-
// We now re-marshal the object
305-
reMarshsalledBody, err := json.Marshal(object)
306-
if err != nil {
307-
kep.logger.Error("could not re-marshal body", zap.Error(err))
308-
continue
304+
// Once we have the modified k8s API object, we need to re-set
305+
// the log body. Since this process is quite expensive, we make
306+
// sure we do it only if the object has been modified at all.
307+
if modified {
308+
err := kep.setNewBody(lr, object)
309+
if err != nil {
310+
// We have already logged the error inside setNewBody
311+
continue
312+
}
309313
}
310-
// And update the body of the log record
311-
lr.Body().SetStr(string(reMarshsalledBody))
314+
315+
// Now onto the facets (extra attributes that are computed from
316+
// the object without changing the object itself).
312317

313318
// Add attributes["observe_transform"]["facets"] if it doesn't exist
314319
transform, exists := lr.Attributes().Get("observe_transform")
@@ -424,3 +429,28 @@ func mapPut(theMap pcommon.Map, key string, value any) error {
424429
return nil
425430

426431
}
432+
433+
// Sets the log body to the map[string]any representation of the object.
434+
// We cannot simply set the log body via SetStr() and use the marshalled object,
435+
// otherwise downstream processors won't be able to index into the object and
436+
// extract additional meaning!
437+
// The best approach is to convert it as such:
438+
// {k8s object} --marshal--> {json []bytes} --Unmarshal--> {map[string]any} -> set the body FromRaw()
439+
func (kep *K8sEventsProcessor) setNewBody(lr plog.LogRecord, object metav1.Object) error {
440+
bodyJson, err := json.Marshal(object)
441+
if err != nil {
442+
kep.logger.Error("could not marshal body to JSON after modifying it", zap.Error(err))
443+
return err
444+
}
445+
var bodyMap map[string]any
446+
// This will never error out, since we just marshalled it ourselves
447+
json.Unmarshal(bodyJson, &bodyMap)
448+
449+
err = lr.Body().SetEmptyMap().FromRaw(bodyMap)
450+
451+
if err != nil {
452+
kep.logger.Error("could not set body from raw", zap.Error(err))
453+
return err
454+
}
455+
return nil
456+
}

components/processors/observek8sattributesprocessor/processor_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/jmespath/go-jmespath"
1010
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/collector/pdata/pcommon"
1112
"go.opentelemetry.io/collector/pdata/plog"
1213
"go.uber.org/zap"
1314
)
@@ -75,10 +76,12 @@ func runTest(t *testing.T, test k8sEventProcessorTest) {
7576
// Pick the right part of the log to query
7677
switch query.location {
7778
case LogLocationBody:
78-
// The body is JSON string and therefore must be unmarshalled into
79-
// map[string]any to be able to query it with jmespath.
80-
body := logRecord.Body().AsString()
81-
json.Unmarshal([]byte(body), &out)
79+
body := logRecord.Body()
80+
if body.Type() != pcommon.ValueTypeMap {
81+
t.Error("the log body should be a structured map, rather than a flat string!")
82+
return
83+
}
84+
out = body.Map().AsRaw()
8285
case LogLocationAttributes:
8386
out = logRecord.Attributes().AsRaw()
8487
}

components/processors/observek8sattributesprocessor/types.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,14 @@ type secretBodyAction interface {
116116
// This is useful, for instance, when we want to redact secrets' values, to
117117
// prevent generating attributes that contain secret's values before redacting
118118
// them.
119-
// TODO [eg] check if there's actually no copying going on here
120-
func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) error {
119+
// Returns true if the object was modified by any action, false otherwise
120+
func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) (bool, error) {
121121
switch typed := obj.(type) {
122122
case *corev1.Secret:
123-
err := proc.runSecretBodyActions(typed)
124-
return err
123+
return true, proc.runSecretBodyActions(typed)
125124
}
126125

127-
return nil
126+
return false, nil
128127
}
129128

130129
func (m *K8sEventsProcessor) runSecretBodyActions(secret *corev1.Secret) error {

0 commit comments

Comments
 (0)