Skip to content

Commit 2668331

Browse files
authored
Allow flush to be used multiple times (#1051)
1 parent 221b2ac commit 2668331

File tree

3 files changed

+152
-22
lines changed

3 files changed

+152
-22
lines changed

batch_logger.go

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@ const (
1212
)
1313

1414
type BatchLogger struct {
15-
client *Client
16-
logCh chan Log
17-
cancel context.CancelFunc
18-
wg sync.WaitGroup
19-
startOnce sync.Once
15+
client *Client
16+
logCh chan Log
17+
flushCh chan chan struct{}
18+
cancel context.CancelFunc
19+
wg sync.WaitGroup
20+
startOnce sync.Once
21+
shutdownOnce sync.Once
2022
}
2123

2224
func NewBatchLogger(client *Client) *BatchLogger {
2325
return &BatchLogger{
24-
client: client,
25-
logCh: make(chan Log, batchSize),
26+
client: client,
27+
logCh: make(chan Log, batchSize),
28+
flushCh: make(chan chan struct{}),
2629
}
2730
}
2831

@@ -35,17 +38,32 @@ func (l *BatchLogger) Start() {
3538
})
3639
}
3740

38-
func (l *BatchLogger) Flush() {
39-
if l.cancel != nil {
40-
l.cancel()
41-
l.wg.Wait()
41+
func (l *BatchLogger) Flush(timeout <-chan struct{}) {
42+
done := make(chan struct{})
43+
select {
44+
case l.flushCh <- done:
45+
select {
46+
case <-done:
47+
case <-timeout:
48+
}
49+
case <-timeout:
4250
}
4351
}
4452

53+
func (l *BatchLogger) Shutdown() {
54+
l.shutdownOnce.Do(func() {
55+
if l.cancel != nil {
56+
l.cancel()
57+
l.wg.Wait()
58+
}
59+
})
60+
}
61+
4562
func (l *BatchLogger) run(ctx context.Context) {
4663
defer l.wg.Done()
4764
var logs []Log
4865
timer := time.NewTimer(batchTimeout)
66+
defer timer.Stop()
4967

5068
for {
5169
select {
@@ -65,8 +83,27 @@ func (l *BatchLogger) run(ctx context.Context) {
6583
logs = nil
6684
}
6785
timer.Reset(batchTimeout)
86+
case done := <-l.flushCh:
87+
flushDrain:
88+
for {
89+
select {
90+
case log := <-l.logCh:
91+
logs = append(logs, log)
92+
default:
93+
break flushDrain
94+
}
95+
}
96+
97+
if len(logs) > 0 {
98+
l.processEvent(logs)
99+
logs = nil
100+
}
101+
if !timer.Stop() {
102+
<-timer.C
103+
}
104+
timer.Reset(batchTimeout)
105+
close(done)
68106
case <-ctx.Done():
69-
// Drain remaining logs from channel
70107
drain:
71108
for {
72109
select {

client.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,17 @@ func (client *Client) RecoverWithContext(
511511
// call to Init.
512512
func (client *Client) Flush(timeout time.Duration) bool {
513513
if client.batchLogger != nil {
514-
client.batchLogger.Flush()
514+
start := time.Now()
515+
timeoutCh := make(chan struct{})
516+
time.AfterFunc(timeout, func() {
517+
close(timeoutCh)
518+
})
519+
client.batchLogger.Flush(timeoutCh)
520+
// update the timeout with the time passed
521+
timeout -= time.Since(start)
522+
if timeout <= 0 {
523+
return false
524+
}
515525
}
516526
return client.Transport.Flush(timeout)
517527
}
@@ -530,7 +540,7 @@ func (client *Client) Flush(timeout time.Duration) bool {
530540

531541
func (client *Client) FlushWithContext(ctx context.Context) bool {
532542
if client.batchLogger != nil {
533-
client.batchLogger.Flush()
543+
client.batchLogger.Flush(ctx.Done())
534544
}
535545
return client.Transport.FlushWithContext(ctx)
536546
}

log_test.go

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/getsentry/sentry-go/attribute"
12+
"github.com/getsentry/sentry-go/internal/testutils"
1213
"github.com/google/go-cmp/cmp"
1314
"github.com/google/go-cmp/cmp/cmpopts"
1415
)
@@ -177,7 +178,7 @@ func Test_sentryLogger_MethodsWithFormat(t *testing.T) {
177178
// invalid attribute should be dropped
178179
l.SetAttributes(attribute.Builder{Key: "key.invalid", Value: attribute.Value{}})
179180
tt.logFunc(ctx, l)
180-
Flush(20 * time.Millisecond)
181+
Flush(testutils.FlushTimeout())
181182

182183
opts := cmp.Options{
183184
cmpopts.IgnoreFields(Log{}, "Timestamp"),
@@ -321,7 +322,7 @@ func Test_sentryLogger_MethodsWithoutFormat(t *testing.T) {
321322
ctx, mockTransport := setupMockTransport()
322323
l := NewLogger(ctx)
323324
tt.logFunc(ctx, l, tt.args)
324-
Flush(20 * time.Millisecond)
325+
Flush(testutils.FlushTimeout())
325326

326327
opts := cmp.Options{
327328
cmpopts.IgnoreFields(Log{}, "Timestamp"),
@@ -400,7 +401,7 @@ func Test_sentryLogger_Write(t *testing.T) {
400401
if n != len(msg) {
401402
t.Errorf("Write returned wrong byte count: got %d, want %d", n, len(msg))
402403
}
403-
Flush(20 * time.Millisecond)
404+
Flush(testutils.FlushTimeout())
404405

405406
gotEvents := mockTransport.Events()
406407
if len(gotEvents) != 1 {
@@ -426,7 +427,7 @@ func Test_sentryLogger_FlushAttributesAfterSend(t *testing.T) {
426427

427428
l.SetAttributes(attribute.String("string", "some str"))
428429
l.Warn(ctx, msg)
429-
Flush(20 * time.Millisecond)
430+
Flush(testutils.FlushTimeout())
430431

431432
gotEvents := mockTransport.Events()
432433
if len(gotEvents) != 1 {
@@ -444,7 +445,7 @@ func Test_batchLogger_Flush(t *testing.T) {
444445
ctx, mockTransport := setupMockTransport()
445446
l := NewLogger(context.Background())
446447
l.Info(ctx, "context done log")
447-
Flush(20 * time.Millisecond)
448+
Flush(testutils.FlushTimeout())
448449

449450
events := mockTransport.Events()
450451
if len(events) != 1 {
@@ -457,7 +458,7 @@ func Test_batchLogger_FlushWithContext(t *testing.T) {
457458
l := NewLogger(context.Background())
458459
l.Info(ctx, "context done log")
459460

460-
cancelCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
461+
cancelCtx, cancel := context.WithTimeout(context.Background(), testutils.FlushTimeout())
461462
FlushWithContext(cancelCtx)
462463
defer cancel()
463464

@@ -467,6 +468,88 @@ func Test_batchLogger_FlushWithContext(t *testing.T) {
467468
}
468469
}
469470

471+
func Test_batchLogger_FlushMultipleTimes(t *testing.T) {
472+
ctx, mockTransport := setupMockTransport()
473+
l := NewLogger(ctx)
474+
475+
for i := 0; i < 5; i++ {
476+
l.Info(ctx, "test")
477+
}
478+
479+
Flush(testutils.FlushTimeout())
480+
481+
events := mockTransport.Events()
482+
if len(events) != 1 {
483+
t.Logf("Got %d events instead of 1", len(events))
484+
for i, event := range events {
485+
t.Logf("Event %d: %d logs", i, len(event.Logs))
486+
}
487+
t.Fatalf("expected 1 event after first flush, got %d", len(events))
488+
}
489+
if len(events[0].Logs) != 5 {
490+
t.Fatalf("expected 5 logs in first batch, got %d", len(events[0].Logs))
491+
}
492+
493+
mockTransport.events = nil
494+
495+
for i := 0; i < 3; i++ {
496+
l.Info(ctx, "test")
497+
}
498+
499+
Flush(testutils.FlushTimeout())
500+
events = mockTransport.Events()
501+
if len(events) != 1 {
502+
t.Fatalf("expected 1 event after second flush, got %d", len(events))
503+
}
504+
if len(events[0].Logs) != 3 {
505+
t.Fatalf("expected 3 logs in second batch, got %d", len(events[0].Logs))
506+
}
507+
508+
mockTransport.events = nil
509+
510+
Flush(testutils.FlushTimeout())
511+
events = mockTransport.Events()
512+
if len(events) != 0 {
513+
t.Fatalf("expected 0 events after third flush with no logs, got %d", len(events))
514+
}
515+
}
516+
517+
func Test_batchLogger_Shutdown(t *testing.T) {
518+
ctx, mockTransport := setupMockTransport()
519+
l := NewLogger(ctx)
520+
for i := 0; i < 3; i++ {
521+
l.Info(ctx, "test")
522+
}
523+
524+
hub := GetHubFromContext(ctx)
525+
hub.Client().batchLogger.Shutdown()
526+
527+
events := mockTransport.Events()
528+
if len(events) != 1 {
529+
t.Fatalf("expected 1 event after shutdown, got %d", len(events))
530+
}
531+
if len(events[0].Logs) != 3 {
532+
t.Fatalf("expected 3 logs in shutdown batch, got %d", len(events[0].Logs))
533+
}
534+
535+
mockTransport.events = nil
536+
537+
// Test that shutdown can be called multiple times safely
538+
hub.Client().batchLogger.Shutdown()
539+
hub.Client().batchLogger.Shutdown()
540+
541+
events = mockTransport.Events()
542+
if len(events) != 0 {
543+
t.Fatalf("expected 0 events after multiple shutdowns, got %d", len(events))
544+
}
545+
546+
Flush(testutils.FlushTimeout())
547+
events = mockTransport.Events()
548+
if len(events) != 0 {
549+
t.Fatalf("expected 0 events after flush on shutdown logger, got %d", len(events))
550+
}
551+
}
552+
470553
func Test_sentryLogger_BeforeSendLog(t *testing.T) {
471554
ctx := context.Background()
472555
mockTransport := &MockTransport{}
@@ -492,7 +575,7 @@ func Test_sentryLogger_BeforeSendLog(t *testing.T) {
492575

493576
l := NewLogger(ctx)
494577
l.Info(ctx, "context done log")
495-
Flush(20 * time.Millisecond)
578+
Flush(testutils.FlushTimeout())
496579

497580
events := mockTransport.Events()
498581
if len(events) != 0 {
@@ -528,7 +611,7 @@ func Test_sentryLogger_TracePropagationWithTransaction(t *testing.T) {
528611
logger := NewLogger(txn.Context())
529612
logger.Info(txn.Context(), "message with tracing")
530613

531-
Flush(20 * time.Millisecond)
614+
Flush(testutils.FlushTimeout())
532615

533616
events := mockTransport.Events()
534617
if len(events) != 1 {

0 commit comments

Comments
 (0)