Skip to content

Commit deb3973

Browse files
authored
[*] release logrus.Entry immediately in BrokerHook.Fire() (#1303)
* [*] release `logrus.Entry` immediately in `BrokerHook.Fire()` Format synchronously in `Fire()` while the entry is owned by the calling goroutine, send `MessageType` through the channel, and `send()` delivers out without `logrus.Entry` copy and without `lastError` channel * add tests
1 parent 4460485 commit deb3973

File tree

2 files changed

+131
-28
lines changed

2 files changed

+131
-28
lines changed

internal/log/log_broker_hook.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ type MessageChanType chan MessageType
1919
type BrokerHook struct {
2020
highLoadTimeout time.Duration // wait this amount of time before skip log entry
2121
subscribers []MessageChanType //
22-
input chan *logrus.Entry
22+
input chan MessageType
2323
ctx context.Context
24-
lastError chan error
2524
level string
2625
mu *sync.Mutex
2726
formatter logrus.Formatter
@@ -34,8 +33,7 @@ const highLoadLimit = 200 * time.Millisecond
3433
func NewBrokerHook(ctx context.Context, level string) *BrokerHook {
3534
l := &BrokerHook{
3635
highLoadTimeout: highLoadLimit,
37-
input: make(chan *logrus.Entry, cacheLimit),
38-
lastError: make(chan error),
36+
input: make(chan MessageType, cacheLimit),
3937
ctx: ctx,
4038
level: level,
4139
mu: new(sync.Mutex),
@@ -79,18 +77,20 @@ func (hook *BrokerHook) Fire(entry *logrus.Entry) error {
7977
if hook.ctx.Err() != nil {
8078
return nil
8179
}
80+
hook.mu.Lock()
81+
f := hook.formatter
82+
hook.mu.Unlock()
83+
raw, err := f.Format(entry)
84+
if err != nil {
85+
return err
86+
}
8287
select {
83-
case hook.input <- entry:
88+
case hook.input <- MessageType(raw):
8489
// entry sent
8590
case <-time.After(hook.highLoadTimeout):
8691
// entry dropped due to a huge load, check stdout or file for detailed log
8792
}
88-
select {
89-
case err := <-hook.lastError:
90-
return err
91-
default:
92-
return nil
93-
}
93+
return nil
9494
}
9595

9696
// Levels returns the available logging levels
@@ -119,38 +119,29 @@ func (hook *BrokerHook) Levels() []logrus.Level {
119119

120120
// poll checks for incoming messages and caches them internally
121121
// until either a maximum amount is reached, or a timeout occurs.
122-
func (hook *BrokerHook) poll(input <-chan *logrus.Entry) {
122+
func (hook *BrokerHook) poll(input <-chan MessageType) {
123123
for {
124124
select {
125125
case <-hook.ctx.Done(): //check context with high priority
126126
return
127-
case entry := <-input:
128-
hook.send(entry)
127+
case msg := <-input:
128+
hook.send(msg)
129129
}
130130
}
131131
}
132132

133-
// send sends cached messages to the postgres server
134-
func (hook *BrokerHook) send(entry *logrus.Entry) {
133+
// send sends out a pre-formatted message to all subscribers
134+
func (hook *BrokerHook) send(msg MessageType) {
135135
hook.mu.Lock()
136136
defer hook.mu.Unlock()
137137
if len(hook.subscribers) == 0 {
138-
return // Nothing to do here.
138+
return
139139
}
140-
msg, err := hook.formatter.Format(entry)
141140
for _, subscriber := range hook.subscribers {
142141
select {
143-
case subscriber <- MessageType(msg):
142+
case subscriber <- msg:
144143
default:
145144
//no time to wait
146145
}
147146
}
148-
if err != nil {
149-
select {
150-
case hook.lastError <- err:
151-
//error sent to the logger
152-
default:
153-
//there is unprocessed error already
154-
}
155-
}
156147
}

internal/log/log_broker_hook_test.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,32 @@ package log
22

33
import (
44
"context"
5+
"sync"
56
"testing"
7+
"testing/synctest"
8+
"time"
69

10+
"github.com/sirupsen/logrus"
711
"github.com/stretchr/testify/assert"
812
)
913

14+
type errorFormatter struct{}
15+
16+
func (f *errorFormatter) Format(*logrus.Entry) ([]byte, error) {
17+
return nil, assert.AnError
18+
}
19+
20+
func TestFormatterError(t *testing.T) {
21+
hook := NewBrokerHook(t.Context(), "info")
22+
hook.SetBrokerFormatter(&errorFormatter{})
23+
entry := logrus.NewEntry(logrus.New())
24+
entry.Message = "hello broker"
25+
entry.Level = logrus.InfoLevel
26+
assert.Error(t, hook.Fire(entry))
27+
}
28+
1029
func TestRemoveSubscriber(t *testing.T) {
11-
hook := NewBrokerHook(context.Background(), "info")
30+
hook := NewBrokerHook(t.Context(), "info")
1231
msgChan1 := make(MessageChanType)
1332
msgChan2 := make(MessageChanType)
1433
hook.AddSubscriber(msgChan1)
@@ -23,3 +42,96 @@ func TestRemoveSubscriber(t *testing.T) {
2342
hook.RemoveSubscriber(msgChan2)
2443
assert.Equal(t, 0, len(hook.subscribers))
2544
}
45+
46+
func TestFire_CancelledContext(t *testing.T) {
47+
ctx, cancel := context.WithCancel(t.Context())
48+
cancel()
49+
hook := NewBrokerHook(ctx, "info")
50+
err := hook.Fire(logrus.NewEntry(logrus.New()))
51+
assert.NoError(t, err)
52+
}
53+
54+
func TestFire_DeliverToSubscriber(t *testing.T) {
55+
synctest.Test(t, func(t *testing.T) {
56+
ctx, cancel := context.WithCancel(t.Context())
57+
hook := NewBrokerHook(ctx, "info")
58+
msgChan := make(MessageChanType, 1)
59+
hook.AddSubscriber(msgChan)
60+
61+
entry := logrus.NewEntry(logrus.New())
62+
entry.Message = "hello broker"
63+
entry.Level = logrus.InfoLevel
64+
assert.NoError(t, hook.Fire(entry))
65+
66+
// Wait for the poll goroutine to drain input and deliver to msgChan.
67+
synctest.Wait()
68+
69+
select {
70+
case msg := <-msgChan:
71+
assert.Contains(t, string(msg), "hello broker")
72+
default:
73+
t.Fatal("message not delivered")
74+
}
75+
76+
cancel() // signal poll to exit
77+
synctest.Wait() // wait for poll goroutine to return
78+
})
79+
}
80+
81+
func TestFire_HighLoad(t *testing.T) {
82+
synctest.Test(t, func(t *testing.T) {
83+
// Construct without starting poll so the input channel stays full.
84+
hook := &BrokerHook{
85+
highLoadTimeout: time.Millisecond,
86+
input: make(chan MessageType, 1),
87+
ctx: t.Context(),
88+
level: "info",
89+
mu: &sync.Mutex{},
90+
formatter: defaultFormatter,
91+
}
92+
hook.input <- "fill" // fill to capacity; no poll goroutine to drain it
93+
94+
entry := logrus.NewEntry(logrus.New())
95+
entry.Message = "to be dropped"
96+
// The test goroutine is the only goroutine in the bubble.
97+
// Fire blocks on time.After; fake time advances and takes the drop branch.
98+
assert.NoError(t, hook.Fire(entry))
99+
})
100+
}
101+
102+
func TestSend_NoSubscribersIsNoop(t *testing.T) {
103+
hook := NewBrokerHook(t.Context(), "info")
104+
// No subscribers — send must return without blocking or panicking.
105+
hook.send("msg")
106+
}
107+
108+
func TestSend_FullSubscriberDrops(t *testing.T) {
109+
hook := NewBrokerHook(t.Context(), "info")
110+
// Unbuffered channel with no reader — send must take the default branch and not block.
111+
msgChan := make(MessageChanType)
112+
hook.AddSubscriber(msgChan)
113+
hook.send("dropped")
114+
}
115+
116+
func TestSetBrokerFormatter_NilResetsDefault(t *testing.T) {
117+
hook := NewBrokerHook(t.Context(), "info")
118+
hook.SetBrokerFormatter(&logrus.JSONFormatter{})
119+
hook.SetBrokerFormatter(nil)
120+
assert.Equal(t, defaultFormatter, hook.formatter)
121+
}
122+
123+
func TestLevels(t *testing.T) {
124+
cases := []struct {
125+
level string
126+
expected []logrus.Level
127+
}{
128+
{"none", []logrus.Level{}},
129+
{"debug", logrus.AllLevels},
130+
{"info", []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel, logrus.InfoLevel}},
131+
{"error", []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel}},
132+
}
133+
for _, tc := range cases {
134+
hook := NewBrokerHook(t.Context(), tc.level)
135+
assert.Equal(t, tc.expected, hook.Levels(), "level=%s", tc.level)
136+
}
137+
}

0 commit comments

Comments
 (0)