Skip to content

fix: Override log body with structured object #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions components/processors/observek8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,24 @@ func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (p

// ALWAYS RUN BODY ACTIONS FIRST
// The attributes should always be computed on the MODIFIED body.
err := kep.RunBodyActions(object)
modified, err := kep.RunBodyActions(object)
if err != nil {
kep.logger.Error("could not run body actions", zap.Error(err))
continue
}
// We now re-marshal the object
reMarshsalledBody, err := json.Marshal(object)
if err != nil {
kep.logger.Error("could not re-marshal body", zap.Error(err))
continue
// Once we have the modified k8s API object, we need to re-set
// the log body. Since this process is quite expensive, we make
// sure we do it only if the object has been modified at all.
if modified {
err := kep.setNewBody(lr, object)
if err != nil {
// We have already logged the error inside setNewBody
continue
}
}
// And update the body of the log record
lr.Body().SetStr(string(reMarshsalledBody))

// Now onto the facets (extra attributes that are computed from
// the object without changing the object itself).

// Add attributes["observe_transform"]["facets"] if it doesn't exist
transform, exists := lr.Attributes().Get("observe_transform")
Expand Down Expand Up @@ -424,3 +429,28 @@ func mapPut(theMap pcommon.Map, key string, value any) error {
return nil

}

// Sets the log body to the map[string]any representation of the object.
// We cannot simply set the log body via SetStr() and use the marshalled object,
// otherwise downstream processors won't be able to index into the object and
// extract additional meaning!
// The best approach is to convert it as such:
// {k8s object} --marshal--> {json []bytes} --Unmarshal--> {map[string]any} -> set the body FromRaw()
func (kep *K8sEventsProcessor) setNewBody(lr plog.LogRecord, object metav1.Object) error {
bodyJson, err := json.Marshal(object)
if err != nil {
kep.logger.Error("could not marshal body to JSON after modifying it", zap.Error(err))
return err
}
var bodyMap map[string]any
// This will never error out, since we just marshalled it ourselves
json.Unmarshal(bodyJson, &bodyMap)

err = lr.Body().SetEmptyMap().FromRaw(bodyMap)

if err != nil {
kep.logger.Error("could not set body from raw", zap.Error(err))
return err
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jmespath/go-jmespath"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -75,10 +76,12 @@ func runTest(t *testing.T, test k8sEventProcessorTest) {
// Pick the right part of the log to query
switch query.location {
case LogLocationBody:
// The body is JSON string and therefore must be unmarshalled into
// map[string]any to be able to query it with jmespath.
body := logRecord.Body().AsString()
json.Unmarshal([]byte(body), &out)
body := logRecord.Body()
if body.Type() != pcommon.ValueTypeMap {
t.Error("the log body should be a structured map, rather than a flat string!")
return
}
out = body.Map().AsRaw()
case LogLocationAttributes:
out = logRecord.Attributes().AsRaw()
}
Expand Down
9 changes: 4 additions & 5 deletions components/processors/observek8sattributesprocessor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ type secretBodyAction interface {
// This is useful, for instance, when we want to redact secrets' values, to
// prevent generating attributes that contain secret's values before redacting
// them.
// TODO [eg] check if there's actually no copying going on here
func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) error {
// Returns true if the object was modified by any action, false otherwise
func (proc *K8sEventsProcessor) RunBodyActions(obj metav1.Object) (bool, error) {
switch typed := obj.(type) {
case *corev1.Secret:
err := proc.runSecretBodyActions(typed)
return err
return true, proc.runSecretBodyActions(typed)
}

return nil
return false, nil
}

func (m *K8sEventsProcessor) runSecretBodyActions(secret *corev1.Secret) error {
Expand Down
Loading