Skip to content

Commit 52cb35e

Browse files
committed
Fix UB in batch processor when trying to read bytes size after adding request to pipeline
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent e5e965c commit 52cb35e

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

processor/batchprocessor/batch_processor.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,8 @@ func (b *shard[T]) resetTimer() {
260260
func (b *shard[T]) sendItems(trigger trigger) {
261261
sent, req := b.batch.split(b.processor.sendBatchMaxSize)
262262

263-
err := b.batch.export(b.exportCtx, req)
264-
if err != nil {
265-
b.processor.logger.Warn("Sender failed", zap.Error(err))
266-
return
267-
}
268-
var bytes int
269263
bpt := b.processor.telemetry
270-
264+
var bytes int
271265
// Check if the instrument is enabled to calculate the size of the batch in bytes.
272266
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled
273267
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
@@ -276,6 +270,11 @@ func (b *shard[T]) sendItems(trigger trigger) {
276270
bytes = b.batch.sizeBytes(req)
277271
}
278272

273+
err := b.batch.export(b.exportCtx, req)
274+
if err != nil {
275+
b.processor.logger.Warn("Sender failed", zap.Error(err))
276+
return
277+
}
279278
bpt.record(trigger, int64(sent), int64(bytes))
280279
}
281280

@@ -442,14 +441,14 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
442441

443442
// add updates current batchTraces by adding new TraceData object
444443
func (bt *batchTraces) add(td ptrace.Traces) {
444+
defer pref.UnrefTraces(td)
445445
newSpanCount := td.SpanCount()
446446
if newSpanCount == 0 {
447447
return
448448
}
449449

450450
bt.spanCount += newSpanCount
451451
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
452-
pref.UnrefTraces(td)
453452
}
454453

455454
func (bt *batchTraces) sizeBytes(td ptrace.Traces) int {
@@ -521,13 +520,13 @@ func (bm *batchMetrics) itemCount() int {
521520
}
522521

523522
func (bm *batchMetrics) add(md pmetric.Metrics) {
523+
defer pref.UnrefMetrics(md)
524524
newDataPointCount := md.DataPointCount()
525525
if newDataPointCount == 0 {
526526
return
527527
}
528528
bm.dataPointCount += newDataPointCount
529529
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
530-
pref.UnrefMetrics(md)
531530
}
532531

533532
type batchLogs struct {
@@ -571,11 +570,11 @@ func (bl *batchLogs) itemCount() int {
571570
}
572571

573572
func (bl *batchLogs) add(ld plog.Logs) {
573+
defer pref.UnrefLogs(ld)
574574
newLogsCount := ld.LogRecordCount()
575575
if newLogsCount == 0 {
576576
return
577577
}
578578
bl.logCount += newLogsCount
579579
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
580-
pref.UnrefLogs(ld)
581580
}

0 commit comments

Comments
 (0)