Skip to content

Commit ba17897

Browse files
authored
refactor!: update Telemetry Processor logic (#1180)
1 parent 5d2aa89 commit ba17897

File tree

14 files changed

+123
-529
lines changed

14 files changed

+123
-529
lines changed

_examples/metrics/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var meter sentry.Meter
1515

1616
func main() {
1717
err := sentry.Init(sentry.ClientOptions{
18-
Dsn: "https://53eb618df7695aaea90bcc816ff537e0@o447951.ingest.us.sentry.io/5774600",
18+
Dsn: "",
1919
Debug: true,
2020
EnableTracing: true,
2121
TracesSampleRate: 1.0,

client.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,10 @@ type Client struct {
282282
sdkVersion string
283283
// Transport is read-only. Replacing the transport of an existing client is
284284
// not supported, create a new client instead.
285-
Transport Transport
286-
batchLogger *logBatchProcessor
287-
batchMeter *metricBatchProcessor
288-
telemetryBuffer *telemetry.Buffer
285+
Transport Transport
286+
batchLogger *logBatchProcessor
287+
batchMeter *metricBatchProcessor
288+
telemetryProcessor *telemetry.Processor
289289
}
290290

291291
// NewClient creates and returns an instance of Client configured using
@@ -391,9 +391,9 @@ func NewClient(options ClientOptions) (*Client, error) {
391391

392392
client.setupTransport()
393393

394-
// noop Telemetry Buffers fow now
394+
// noop Telemetry Buffers and Processor fow now
395395
// if !options.DisableTelemetryBuffer {
396-
// client.setupTelemetryBuffer()
396+
// client.setupTelemetryProcessor()
397397
// } else
398398
if options.EnableLogs {
399399
client.batchLogger = newLogBatchProcessor(&client)
@@ -426,7 +426,7 @@ func (client *Client) setupTransport() {
426426
client.Transport = transport
427427
}
428428

429-
func (client *Client) setupTelemetryBuffer() { // nolint: unused
429+
func (client *Client) setupTelemetryProcessor() { // nolint: unused
430430
if client.options.DisableTelemetryBuffer {
431431
return
432432
}
@@ -436,10 +436,10 @@ func (client *Client) setupTelemetryBuffer() { // nolint: unused
436436
return
437437
}
438438

439-
// We currently disallow using custom Transport with the new Telemetry Buffer, due to the difference in transport signatures.
439+
// We currently disallow using custom Transport with the new Telemetry Processor, due to the difference in transport signatures.
440440
// The option should be enabled when the new Transport interface signature changes.
441441
if client.options.Transport != nil {
442-
debuglog.Println("Cannot enable Telemetry Buffer with custom Transport: fallback to old transport")
442+
debuglog.Println("Cannot enable Telemetry Processor/Buffers with custom Transport: fallback to old transport")
443443
if client.options.EnableLogs {
444444
client.batchLogger = newLogBatchProcessor(client)
445445
client.batchLogger.Start()
@@ -461,20 +461,20 @@ func (client *Client) setupTelemetryBuffer() { // nolint: unused
461461
})
462462
client.Transport = &internalAsyncTransportAdapter{transport: transport}
463463

464-
storage := map[ratelimit.Category]telemetry.Storage[protocol.EnvelopeItemConvertible]{
465-
ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
466-
ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0),
467-
ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
468-
ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
469-
ratelimit.CategoryTraceMetric: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTraceMetric, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
464+
buffers := map[ratelimit.Category]telemetry.Buffer[protocol.TelemetryItem]{
465+
ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
466+
ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0),
467+
ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryLog, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
468+
ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
469+
ratelimit.CategoryTraceMetric: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTraceMetric, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
470470
}
471471

472472
sdkInfo := &protocol.SdkInfo{
473473
Name: client.sdkIdentifier,
474474
Version: client.sdkVersion,
475475
}
476476

477-
client.telemetryBuffer = telemetry.NewBuffer(storage, transport, &client.dsn.Dsn, sdkInfo)
477+
client.telemetryProcessor = telemetry.NewProcessor(buffers, transport, &client.dsn.Dsn, sdkInfo)
478478
}
479479

480480
func (client *Client) setupIntegrations() {
@@ -547,7 +547,7 @@ func (client *Client) CaptureCheckIn(checkIn *CheckIn, monitorConfig *MonitorCon
547547

548548
// CaptureEvent captures an event on the currently active client if any.
549549
//
550-
// The event must already be assembled. Typically code would instead use
550+
// The event must already be assembled. Typically, code would instead use
551551
// the utility methods like CaptureException. The return value is the
552552
// event ID. In case Sentry is disabled or event was dropped, the return value will be nil.
553553
func (client *Client) CaptureEvent(event *Event, hint *EventHint, scope EventModifier) *EventID {
@@ -567,8 +567,8 @@ func (client *Client) captureLog(log *Log, _ *Scope) bool {
567567
}
568568
}
569569

570-
if client.telemetryBuffer != nil {
571-
if !client.telemetryBuffer.Add(log) {
570+
if client.telemetryProcessor != nil {
571+
if !client.telemetryProcessor.Add(log) {
572572
debuglog.Print("Dropping log: telemetry buffer full or category missing")
573573
return false
574574
}
@@ -595,8 +595,8 @@ func (client *Client) captureMetric(metric *Metric, _ *Scope) bool {
595595
}
596596
}
597597

598-
if client.telemetryBuffer != nil {
599-
if !client.telemetryBuffer.Add(metric) {
598+
if client.telemetryProcessor != nil {
599+
if !client.telemetryProcessor.Add(metric) {
600600
debuglog.Printf("Dropping metric: telemetry buffer full or category missing")
601601
return false
602602
}
@@ -673,7 +673,7 @@ func (client *Client) RecoverWithContext(
673673
// the network synchronously, configure it to use the HTTPSyncTransport in the
674674
// call to Init.
675675
func (client *Client) Flush(timeout time.Duration) bool {
676-
if client.batchLogger != nil || client.batchMeter != nil || client.telemetryBuffer != nil {
676+
if client.batchLogger != nil || client.batchMeter != nil || client.telemetryProcessor != nil {
677677
ctx, cancel := context.WithTimeout(context.Background(), timeout)
678678
defer cancel()
679679
return client.FlushWithContext(ctx)
@@ -700,8 +700,8 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
700700
if client.batchMeter != nil {
701701
client.batchMeter.Flush(ctx.Done())
702702
}
703-
if client.telemetryBuffer != nil {
704-
return client.telemetryBuffer.FlushWithContext(ctx)
703+
if client.telemetryProcessor != nil {
704+
return client.telemetryProcessor.FlushWithContext(ctx)
705705
}
706706
return client.Transport.FlushWithContext(ctx)
707707
}
@@ -711,8 +711,8 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
711711
// Close should be called after Flush and before terminating the program
712712
// otherwise some events may be lost.
713713
func (client *Client) Close() {
714-
if client.telemetryBuffer != nil {
715-
client.telemetryBuffer.Close(5 * time.Second)
714+
if client.telemetryProcessor != nil {
715+
client.telemetryProcessor.Close(5 * time.Second)
716716
}
717717
if client.batchLogger != nil {
718718
client.batchLogger.Shutdown()
@@ -840,8 +840,8 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod
840840
}
841841
}
842842

843-
if client.telemetryBuffer != nil {
844-
if !client.telemetryBuffer.Add(event) {
843+
if client.telemetryProcessor != nil {
844+
if !client.telemetryProcessor.Add(event) {
845845
debuglog.Println("Event dropped: telemetry buffer full or unavailable")
846846
}
847847
} else {

client_test.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,27 +1036,6 @@ func TestClientSetsUpTransport(t *testing.T) {
10361036
require.IsType(t, &noopTransport{}, client.Transport)
10371037
}
10381038

1039-
// func TestClient_SetupTelemetryBuffer_WithDSN(t *testing.T) {
1040-
// client, err := NewClient(ClientOptions{
1041-
// Dsn: "https://public@localhost/1",
1042-
// })
1043-
// if err != nil {
1044-
// t.Fatalf("unexpected error: %v", err)
1045-
// }
1046-
//
1047-
// if client.telemetryBuffer == nil {
1048-
// t.Fatal("expected telemetryBuffer to be initialized")
1049-
// }
1050-
//
1051-
// if _, ok := client.Transport.(*internalAsyncTransportAdapter); !ok {
1052-
// t.Fatalf("expected internalAsyncTransportAdapter, got %T", client.Transport)
1053-
// }
1054-
//
1055-
// if !client.telemetryBuffer.Add(NewEvent()) {
1056-
// t.Fatal("expected Add to succeed with default buffers")
1057-
// }
1058-
//}
1059-
10601039
func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) {
10611040
var buf bytes.Buffer
10621041
debuglog.SetOutput(&buf)
@@ -1067,8 +1046,8 @@ func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) {
10671046
t.Fatalf("unexpected error: %v", err)
10681047
}
10691048

1070-
if client.telemetryBuffer != nil {
1071-
t.Fatal("expected telemetryBuffer to be nil when DSN is missing")
1049+
if client.telemetryProcessor != nil {
1050+
t.Fatal("expected telemetryProcessor to be nil when DSN is missing")
10721051
}
10731052

10741053
if _, ok := client.Transport.(*noopTransport); !ok {

interfaces.go

Lines changed: 0 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -487,80 +487,6 @@ func (e *Event) SetException(exception error, maxErrorDepth int) {
487487
e.Exception = exceptions
488488
}
489489

490-
// ToEnvelope converts the Event to a Sentry envelope.
491-
// This includes the event data and any attachments as separate envelope items.
492-
func (e *Event) ToEnvelope(dsn *protocol.Dsn) (*protocol.Envelope, error) {
493-
return e.ToEnvelopeWithTime(dsn, time.Now())
494-
}
495-
496-
// ToEnvelopeWithTime converts the Event to a Sentry envelope with a specific sentAt time.
497-
// This is primarily useful for testing with predictable timestamps.
498-
func (e *Event) ToEnvelopeWithTime(dsn *protocol.Dsn, sentAt time.Time) (*protocol.Envelope, error) {
499-
// Create envelope header with trace context
500-
trace := make(map[string]string)
501-
if dsc := e.sdkMetaData.dsc; dsc.HasEntries() {
502-
for k, v := range dsc.Entries {
503-
trace[k] = v
504-
}
505-
}
506-
507-
header := &protocol.EnvelopeHeader{
508-
EventID: string(e.EventID),
509-
SentAt: sentAt,
510-
Trace: trace,
511-
}
512-
513-
if dsn != nil {
514-
header.Dsn = dsn.String()
515-
}
516-
517-
header.Sdk = &e.Sdk
518-
519-
envelope := protocol.NewEnvelope(header)
520-
521-
eventBody, err := json.Marshal(e)
522-
if err != nil {
523-
// Try fallback: remove problematic fields and retry
524-
e.Breadcrumbs = nil
525-
e.Contexts = nil
526-
e.Extra = map[string]interface{}{
527-
"info": fmt.Sprintf("Could not encode original event as JSON. "+
528-
"Succeeded by removing Breadcrumbs, Contexts and Extra. "+
529-
"Please verify the data you attach to the scope. "+
530-
"Error: %s", err),
531-
}
532-
533-
eventBody, err = json.Marshal(e)
534-
if err != nil {
535-
return nil, fmt.Errorf("event could not be marshaled even with fallback: %w", err)
536-
}
537-
538-
DebugLogger.Printf("Event marshaling succeeded with fallback after removing problematic fields")
539-
}
540-
541-
var mainItem *protocol.EnvelopeItem
542-
switch e.Type {
543-
case transactionType:
544-
mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeTransaction, eventBody)
545-
case checkInType:
546-
mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody)
547-
case logEvent.Type:
548-
mainItem = protocol.NewLogItem(len(e.Logs), eventBody)
549-
case traceMetricEvent.Type:
550-
mainItem = protocol.NewTraceMetricItem(len(e.Metrics), eventBody)
551-
default:
552-
mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeEvent, eventBody)
553-
}
554-
555-
envelope.AddItem(mainItem)
556-
for _, attachment := range e.Attachments {
557-
attachmentItem := protocol.NewAttachmentItem(attachment.Filename, attachment.ContentType, attachment.Payload)
558-
envelope.AddItem(attachmentItem)
559-
}
560-
561-
return envelope, nil
562-
}
563-
564490
// ToEnvelopeItem converts the Event to a Sentry envelope item.
565491
func (e *Event) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
566492
eventBody, err := json.Marshal(e)
@@ -831,21 +757,6 @@ type Log struct {
831757
Attributes map[string]Attribute `json:"attributes,omitempty"`
832758
}
833759

834-
// ToEnvelopeItem converts the Log to a Sentry envelope item for batching.
835-
func (l *Log) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
836-
logData, err := json.Marshal(l)
837-
if err != nil {
838-
return nil, err
839-
}
840-
841-
return &protocol.EnvelopeItem{
842-
Header: &protocol.EnvelopeItemHeader{
843-
Type: protocol.EnvelopeItemTypeLog,
844-
},
845-
Payload: logData,
846-
}, nil
847-
}
848-
849760
// GetCategory returns the rate limit category for logs.
850761
func (l *Log) GetCategory() ratelimit.Category {
851762
return ratelimit.CategoryLog
@@ -930,20 +841,6 @@ type Metric struct {
930841
Attributes map[string]Attribute `json:"attributes,omitempty"`
931842
}
932843

933-
func (m *Metric) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
934-
metricData, err := json.Marshal(m)
935-
if err != nil {
936-
return nil, err
937-
}
938-
939-
return &protocol.EnvelopeItem{
940-
Header: &protocol.EnvelopeItemHeader{
941-
Type: protocol.EnvelopeItemTypeTraceMetric,
942-
},
943-
Payload: metricData,
944-
}, nil
945-
}
946-
947844
// MarshalJSON converts a Metric to JSON that skips SpanID and timestamp when zero.
948845
func (m *Metric) MarshalJSON() ([]byte, error) {
949846
type metric Metric

0 commit comments

Comments
 (0)