Skip to content

NewSetWithFiltered is not concurrent safe #7565

@khushijain21

Description

@khushijain21

When NewSetWithFiltered is called concurrently with the same kv values - NewSetWithFiltered reads and writes these attributes in memory causing data race here and here.

This is especially visible when using named loggers in otelzap bridge.

See below trace

datarace.txt
2025-10-29T13:13:31.699-0300    debug   publisher       pipeline/client.go:150  client: closing processors      {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "ecs.version": "1.6.0"}
2025-10-29T13:13:31.699-0300    debug   publisher       pipeline/client.go:155  client: done closing processors {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "ecs.version": "1.6.0"}
2025-10-29T13:13:31.699-0300    debug   publisher.memqueue      memqueue/ackloop.go:80  ackloop: return ack to broker loop:1    {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "ecs.version": "1.6.0"}
==================
WARNING: DATA RACE
Read at 0x00c0016416c0 by goroutine 87:
  slices.insertionSortCmpFunc[go.shape.struct { Key go.opentelemetry.io/otel/attribute.Key; Value go.opentelemetry.io/otel/attribute.Value }]()
      /home/mauri870/git/go/src/slices/zsortanyfunc.go:12 +0x52a
  slices.stableCmpFunc[go.shape.struct { Key go.opentelemetry.io/otel/attribute.Key; Value go.opentelemetry.io/otel/attribute.Value }]()
      /home/mauri870/git/go/src/slices/zsortanyfunc.go:343 +0x43b
  slices.SortStableFunc[go.shape.[]go.opentelemetry.io/otel/attribute.KeyValue,go.shape.struct { Key go.opentelemetry.io/otel/attribute.Key; Value go.opentelemetry.io/otel/attribute.Value }]()
      /home/mauri870/git/go/src/slices/sort.go:38 +0x92
  go.opentelemetry.io/otel/attribute.NewSetWithFiltered()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/[email protected]/attribute/set.go:212 +0x54
  go.opentelemetry.io/otel/attribute.NewSet()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/[email protected]/attribute/set.go:184 +0x3c
  go.opentelemetry.io/contrib/bridges/otelzap.NewCore.WithInstrumentationAttributes.func3()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:123 +0x98
  go.opentelemetry.io/otel/log.loggerOptionFunc.applyLogger()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:105 +0xe7
  go.opentelemetry.io/otel/log.NewLoggerConfig()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:80 +0x282
  go.opentelemetry.io/otel/sdk/log.(*LoggerProvider).Logger()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/provider.go:116 +0x2fc
  go.opentelemetry.io/contrib/bridges/otelzap.(*Core).Write()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/contrib/bridges/[email protected]/core.go:236 +0x69a
  go.opentelemetry.io/collector/internal/telemetry/componentattribute.(*otelTeeCoreWithAttributes).Write()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/internal/[email protected]/componentattribute/logger_zap.go:136 +0x278
  go.elastic.co/ecszap.core.Write()
      /home/mauri870/gopath/pkg/mod/go.elastic.co/[email protected]/core.go:75 +0x2f0
  go.elastic.co/ecszap.(*core).Write()
      <autogenerated>:1 +0xeb
  go.uber.org/zap/zapcore.(*CheckedEntry).Write()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/zapcore/entry.go:253 +0x215
  go.uber.org/zap.(*SugaredLogger).log()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/sugar.go:355 +0x12c
  go.uber.org/zap.(*SugaredLogger).Debug()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/sugar.go:149 +0x329
  github.com/elastic/elastic-agent-libs/logp.(*Logger).Debug()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/[email protected]/logp/logger.go:181 +0x2e4
  github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/queue/memqueue/ackloop.go:80 +0xe5
  github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/queue/memqueue/ackloop.go:54 +0x3b1
  github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/queue/memqueue/broker.go:173 +0x95

Previous write at 0x00c0016416c0 by goroutine 138:
  go.opentelemetry.io/otel/attribute.NewSetWithFiltered()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/[email protected]/attribute/set.go:230 +0x2e9
  go.opentelemetry.io/otel/attribute.NewSet()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/[email protected]/attribute/set.go:184 +0x3c
  go.opentelemetry.io/contrib/bridges/otelzap.NewCore.WithInstrumentationAttributes.func3()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:123 +0x98
  go.opentelemetry.io/otel/log.loggerOptionFunc.applyLogger()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:105 +0xe7
  go.opentelemetry.io/otel/log.NewLoggerConfig()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/[email protected]/logger.go:80 +0x282
  go.opentelemetry.io/otel/sdk/log.(*LoggerProvider).Logger()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/provider.go:116 +0x2fc
  go.opentelemetry.io/contrib/bridges/otelzap.(*Core).Write()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/contrib/bridges/[email protected]/core.go:236 +0x69a
  go.opentelemetry.io/collector/internal/telemetry/componentattribute.(*otelTeeCoreWithAttributes).Write()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/internal/[email protected]/componentattribute/logger_zap.go:136 +0x278
  go.elastic.co/ecszap.core.Write()
      /home/mauri870/gopath/pkg/mod/go.elastic.co/[email protected]/core.go:75 +0x2f0
  go.elastic.co/ecszap.(*core).Write()
      <autogenerated>:1 +0xeb
  go.uber.org/zap/zapcore.(*CheckedEntry).Write()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/zapcore/entry.go:253 +0x215
  go.uber.org/zap.(*SugaredLogger).log()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/sugar.go:355 +0x12c
  go.uber.org/zap.(*SugaredLogger).Debug()
      /home/mauri870/gopath/pkg/mod/go.uber.org/[email protected]/sugar.go:149 +0x668
  github.com/elastic/elastic-agent-libs/logp.(*Logger).Debug()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/[email protected]/logp/logger.go:181 +0x623
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/pipeline/client.go:155 +0x4f9
  github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Close()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/beater/channels.go:142 +0x41
  github.com/elastic/beats/v7/filebeat/input/v2/input-stateless.configuredInput.Run.deferwrap1()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/input/v2/input-stateless/stateless.go:93 +0x39
  runtime.deferreturn()
      /home/mauri870/git/go/src/runtime/panic.go:663 +0x5d
  github.com/elastic/beats/v7/filebeat/input/v2/input-stateless.(*configuredInput).Run()
      <autogenerated>:1 +0xd2
  github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/input/v2/compat/compat.go:162 +0x5ab

Goroutine 87 (running) created at:
  github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/queue/memqueue/broker.go:171 +0x16a
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.queueFactoryForUserConfig.FactoryForSettings.func1()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/queue/memqueue/broker.go:149 +0x8e
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*outputController).createQueueIfNeeded()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/pipeline/controller.go:275 +0x23b
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*outputController).Set()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/pipeline/controller.go:124 +0xe4
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.New()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/pipeline/pipeline.go:170 +0x71a
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/libbeat/publisher/pipeline/module.go:100 +0x3a4
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.NewBeatForReceiver()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/x-pack/libbeat/cmd/instance/beat.go:293 +0x4411
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/x-pack/filebeat/fbreceiver/factory.go:48 +0xaef
  go.opentelemetry.io/collector/receiver.(*factory).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/receiver.go:177 +0x2c4
  go.opentelemetry.io/collector/service/internal/builders.(*ReceiverBuilder).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/internal/builders/receiver.go:86 +0x3a6
  go.opentelemetry.io/collector/service/internal/graph.(*receiverNode).buildComponent()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/internal/graph/receiver.go:85 +0x10b7
  go.opentelemetry.io/collector/service/internal/graph.(*Graph).buildComponents()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/internal/graph/graph.go:304 +0x2e4
  go.opentelemetry.io/collector/service/internal/graph.Build()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/internal/graph/graph.go:91 +0x7ec
  go.opentelemetry.io/collector/service.(*Service).initGraph()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/service.go:307 +0x3cd
  go.opentelemetry.io/collector/service.New()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/service.go:198 +0x1c9c
  go.opentelemetry.io/collector/otelcol.(*Collector).setupConfigurationComponents()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/collector.go:195 +0x159d
  go.opentelemetry.io/collector/otelcol.(*Collector).Run()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/[email protected]/collector.go:310 +0x6a
  github.com/elastic/elastic-agent/internal/pkg/otel.Run()
      /home/mauri870/git/elastic/elastic-agent/internal/pkg/otel/run.go:41 +0x2a4
  github.com/elastic/elastic-agent/internal/pkg/agent/cmd.RunCollector()
      /home/mauri870/git/elastic/elastic-agent/internal/pkg/agent/cmd/otel.go:124 +0x556
  github.com/elastic/elastic-agent/internal/pkg/agent/cmd.newOtelCommandWithArgs.func1()
      /home/mauri870/git/elastic/elastic-agent/internal/pkg/agent/cmd/otel.go:58 +0x2a4
  github.com/spf13/cobra.(*Command).execute()
      /home/mauri870/gopath/pkg/mod/github.com/spf13/[email protected]/command.go:1015 +0x119b
  github.com/spf13/cobra.(*Command).ExecuteC()
      /home/mauri870/gopath/pkg/mod/github.com/spf13/[email protected]/command.go:1148 +0x797
  github.com/spf13/cobra.(*Command).Execute()
      /home/mauri870/gopath/pkg/mod/github.com/spf13/[email protected]/command.go:1071 +0x26
  main.main()
      /home/mauri870/git/elastic/elastic-agent/main.go:32 +0x27a

Goroutine 138 (running) created at:
  github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/input/v2/compat/compat.go:139 +0x166
  github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/beater/crawler.go:149 +0x747
  github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/beater/crawler.go:80 +0x1d5
  github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Run()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/filebeat/beater/filebeat.go:444 +0x3631
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.(*BeatReceiver).Start()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/x-pack/libbeat/cmd/instance/receiver.go:128 +0x582
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.(*filebeatReceiver).Start.func1()
      /home/mauri870/gopath/pkg/mod/github.com/elastic/beats/[email protected]/x-pack/filebeat/fbreceiver/receiver.go:28 +0x175
==================
2025-10-29T13:13:31.699-0300    info    input.benchmark compat/compat.go:166    Input 'benchmark' stopped (goroutine)   {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "id": " C52671FD5B94182", "ecs.version": "1.6.0"}
2025-10-29T13:13:31.700-0300    info    input.benchmark.metric_registry inputmon/input.go:198   unregistering   {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "id": " C52671FD5B94182", "registry_id": " C52671FD5B94182", "input_id": " C52671FD5B94182", "input_type": "benchmark", "ecs.version": "1.6.0"}
2025-10-29T13:13:31.700-0300    debug   publisher.memqueue      memqueue/ackloop.go:82  ackloop:  done send ack {"resource": {"service.instance.id": "b4ea0054-ba01-4d6f-be6a-5e09be5dd75b", "service.name": "./elastic-agent", "service.version": "9.3.0"}, "otelcol.component.id": "filebeatreceiver", "otelcol.component.kind": "receiver", "otelcol.signal": "logs", "service.name": "filebeat", "ecs.version": "1.6.0"}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingresponse neededWaiting on user input before progress can be made

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions