From 65fada036067c0a47be8afc7bd81a5b7354f9c5a Mon Sep 17 00:00:00 2001 From: Enrico Giorio Date: Mon, 4 Nov 2024 19:44:14 +0000 Subject: [PATCH] fix: Override log body with structured object Currently we were modifying the marshalled k8s object and then overriding the log body with its unmarshalled string representation. This doesn't work well when the custom processor is not last in the pipeline, because other processors might have to index into the structured body to be able to extract values. Since we need to move the transform processor after the custom processor in the agent configmap, we want to make sure that this processor leaves the log body as a structured object, rather than a plain string. --- .../processor.go | 46 +++++++++++++++---- .../processor_test.go | 11 +++-- .../observek8sattributesprocessor/types.go | 9 ++-- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/components/processors/observek8sattributesprocessor/processor.go b/components/processors/observek8sattributesprocessor/processor.go index 0e6548943..fe21cdacf 100644 --- a/components/processors/observek8sattributesprocessor/processor.go +++ b/components/processors/observek8sattributesprocessor/processor.go @@ -296,19 +296,24 @@ func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (p // ALWAYS RUN BODY ACTIONS FIRST // The attributes should always be computed on the MODIFIED body. - err := kep.RunBodyActions(object) + modified, err := kep.RunBodyActions(object) if err != nil { kep.logger.Error("could not run body actions", zap.Error(err)) continue } - // We now re-marshal the object - reMarshsalledBody, err := json.Marshal(object) - if err != nil { - kep.logger.Error("could not re-marshal body", zap.Error(err)) - continue + // Once we have the modified k8s API object, we need to re-set + // the log body. Since this process is quite expensive, we make + // sure we do it only if the object has been modified at all. + if modified { + err := kep.setNewBody(lr, object) + if err != nil { + // We have already logged the error inside setNewBody + continue + } } - // And update the body of the log record - lr.Body().SetStr(string(reMarshsalledBody)) + + // Now onto the facets (extra attributes that are computed from + // the object without changing the object itself). // Add attributes["observe_transform"]["facets"] if it doesn't exist transform, exists := lr.Attributes().Get("observe_transform") @@ -424,3 +429,28 @@ func mapPut(theMap pcommon.Map, key string, value any) error { return nil } + +// Sets the log body to the map[string]any representation of the object. +// We cannot simply set the log body via SetStr() and use the marshalled object, +// otherwise downstream processors won't be able to index into the object and +// extract additional meaning! +// The best approach is to convert it as such: +// {k8s object} --marshal--> {json []bytes} --Unmarshal--> {map[string]any} -> set the body FromRaw() +func (kep *K8sEventsProcessor) setNewBody(lr plog.LogRecord, object metav1.Object) error { + bodyJson, err := json.Marshal(object) + if err != nil { + kep.logger.Error("could not marshal body to JSON after modifying it", zap.Error(err)) + return err + } + var bodyMap map[string]any + // This will never error out, since we just marshalled it ourselves + json.Unmarshal(bodyJson, &bodyMap) + + err = lr.Body().SetEmptyMap().FromRaw(bodyMap) + + if err != nil { + kep.logger.Error("could not set body from raw", zap.Error(err)) + return err + } + return nil +} diff --git a/components/processors/observek8sattributesprocessor/processor_test.go b/components/processors/observek8sattributesprocessor/processor_test.go index 2c8824253..729dad992 100644 --- a/components/processors/observek8sattributesprocessor/processor_test.go +++ b/components/processors/observek8sattributesprocessor/processor_test.go @@ -8,6 +8,7 @@ import ( "github.com/jmespath/go-jmespath" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" ) @@ -75,10 +76,12 @@ func runTest(t *testing.T, test k8sEventProcessorTest) { // Pick the right part of the log to query switch query.location { case LogLocationBody: - // The body is JSON string and therefore must be unmarshalled into - // map[string]any to be able to query it with jmespath. - body := logRecord.Body().AsString() - json.Unmarshal([]byte(body), &out) + body := logRecord.Body() + if body.Type() != pcommon.ValueTypeMap { + t.Error("the log body should be a structured map, rather than a flat string!") + return + } + out = body.Map().AsRaw() case LogLocationAttributes: out = logRecord.Attributes().AsRaw() } diff --git a/components/processors/observek8sattributesprocessor/types.go b/components/processors/observek8sattributesprocessor/types.go index 9e35715e9..5f9f9267b 100644 --- a/components/processors/observek8sattributesprocessor/types.go +++ b/components/processors/observek8sattributesprocessor/types.go @@ -116,15 +116,14 @@ type secretBodyAction interface { // This is useful, for instance, when we want to redact secrets' values, to // prevent generating attributes that contain secret's values before redacting // them. -// TODO [eg] check if there's actually no copying going on here -func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) error { +// Returns true if the object was modified by any action, false otherwise +func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) (bool, error) { switch typed := obj.(type) { case *corev1.Secret: - err := proc.runSecretBodyActions(typed) - return err + return true, proc.runSecretBodyActions(typed) } - return nil + return false, nil } func (m *K8sEventsProcessor) runSecretBodyActions(secret *corev1.Secret) error {