Skip to content

Commit 49fa6d1

Browse files
committed
Use sync.Pool
Signed-off-by: Saswata Mukherjee <[email protected]>
1 parent 917985a commit 49fa6d1

File tree

1 file changed

+46
-24
lines changed

1 file changed

+46
-24
lines changed

exp/api/remote/remote_api.go

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"path"
2727
"strconv"
2828
"strings"
29+
"sync"
2930
"time"
3031

3132
"github.com/klauspost/compress/snappy"
@@ -40,8 +41,8 @@ import (
4041
type API struct {
4142
baseURL *url.URL
4243

43-
opts apiOpts
44-
reqBuf, comprBuf []byte
44+
opts apiOpts
45+
bufPool sync.Pool
4546
}
4647

4748
// APIOption represents a remote API option.
@@ -119,9 +120,6 @@ func (n nopSlogHandler) WithAttrs([]slog.Attr) slog.Handler { return n }
119120
func (n nopSlogHandler) WithGroup(string) slog.Handler { return n }
120121

121122
// NewAPI returns a new API for the clients of Remote Write Protocol.
122-
//
123-
// It is not safe to use the returned API from multiple goroutines, create a
124-
// separate *API for each goroutine.
125123
func NewAPI(baseURL string, opts ...APIOption) (*API, error) {
126124
parsedURL, err := url.Parse(baseURL)
127125
if err != nil {
@@ -141,10 +139,17 @@ func NewAPI(baseURL string, opts ...APIOption) (*API, error) {
141139

142140
parsedURL.Path = path.Join(parsedURL.Path, o.path)
143141

144-
return &API{
142+
api := &API{
145143
opts: o,
146144
baseURL: parsedURL,
147-
}, nil
145+
bufPool: sync.Pool{
146+
New: func() any {
147+
b := make([]byte, 0, 1024*16) // Initial capacity of 16KB
148+
return &b
149+
},
150+
},
151+
}
152+
return api, nil
148153
}
149154

150155
type retryableError struct {
@@ -176,9 +181,8 @@ type gogoProtoEnabled interface {
176181
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
177182
// error out on unknown scheme.
178183
func (r *API) Write(ctx context.Context, cType WriteContentType, msg any) (_ WriteResponseStats, err error) {
179-
// Reset the buffer.
180-
// remove this buf or add lock
181-
r.reqBuf = r.reqBuf[:0]
184+
buf := r.bufPool.Get().(*[]byte)
185+
defer r.bufPool.Put(buf)
182186

183187
if err := cType.Validate(); err != nil {
184188
return WriteResponseStats{}, err
@@ -189,32 +193,40 @@ func (r *API) Write(ctx context.Context, cType WriteContentType, msg any) (_ Wri
189193
case vtProtoEnabled:
190194
// Use optimized vtprotobuf if supported.
191195
size := m.SizeVT()
192-
if len(r.reqBuf) < size {
193-
r.reqBuf = make([]byte, size)
196+
if cap(*buf) < size {
197+
*buf = make([]byte, size)
198+
} else {
199+
*buf = (*buf)[:size]
194200
}
195-
if _, err := m.MarshalToSizedBufferVT(r.reqBuf[:size]); err != nil {
201+
202+
if _, err := m.MarshalToSizedBufferVT(*buf); err != nil {
196203
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
197204
}
198205
case gogoProtoEnabled:
199206
// Gogo proto if supported.
200207
size := m.Size()
201-
if len(r.reqBuf) < size {
202-
r.reqBuf = make([]byte, size)
208+
if cap(*buf) < size {
209+
*buf = make([]byte, size)
210+
} else {
211+
*buf = (*buf)[:size]
203212
}
204-
if _, err := m.MarshalToSizedBuffer(r.reqBuf[:size]); err != nil {
213+
214+
if _, err := m.MarshalToSizedBuffer(*buf); err != nil {
205215
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
206216
}
207217
case proto.Message:
208218
// Generic proto.
209-
r.reqBuf, err = (proto.MarshalOptions{}).MarshalAppend(r.reqBuf, m)
219+
*buf, err = (proto.MarshalOptions{}).MarshalAppend(*buf, m)
210220
if err != nil {
211221
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
212222
}
213223
default:
214224
return WriteResponseStats{}, fmt.Errorf("unknown message type %T", m)
215225
}
216226

217-
payload, err := compressPayload(&r.comprBuf, r.opts.compression, r.reqBuf)
227+
comprBuf := r.bufPool.Get().(*[]byte)
228+
defer r.bufPool.Put(comprBuf)
229+
payload, err := compressPayload(comprBuf, r.opts.compression, *buf)
218230
if err != nil {
219231
return WriteResponseStats{}, fmt.Errorf("compressing %w", err)
220232
}
@@ -270,11 +282,13 @@ func (r *API) Write(ctx context.Context, cType WriteContentType, msg any) (_ Wri
270282
func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []byte, _ error) {
271283
switch enc {
272284
case SnappyBlockCompression:
273-
compressed = snappy.Encode(*tmpbuf, inp)
274-
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
275-
// grow the buffer for the next time.
276-
*tmpbuf = make([]byte, n)
285+
if cap(*tmpbuf) < snappy.MaxEncodedLen(len(inp)) {
286+
*tmpbuf = make([]byte, snappy.MaxEncodedLen(len(inp)))
287+
} else {
288+
*tmpbuf = (*tmpbuf)[:snappy.MaxEncodedLen(len(inp))]
277289
}
290+
291+
compressed = snappy.Encode(*tmpbuf, inp)
278292
return compressed, nil
279293
default:
280294
return compressed, fmt.Errorf("unknown compression scheme [%v]", enc)
@@ -394,6 +408,12 @@ func WithHandlerMiddlewares(middlewares ...func(http.Handler) http.Handler) Hand
394408
// If the request body is not snappy-encoded, it returns an error.
395409
// Used by default in NewRemoteWriteHandler.
396410
func SnappyDecompressorMiddleware(logger *slog.Logger) func(http.Handler) http.Handler {
411+
bufPool := sync.Pool{
412+
New: func() any {
413+
return bytes.NewBuffer(nil)
414+
},
415+
}
416+
397417
return func(next http.Handler) http.Handler {
398418
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
399419
enc := r.Header.Get("Content-Encoding")
@@ -404,8 +424,10 @@ func SnappyDecompressorMiddleware(logger *slog.Logger) func(http.Handler) http.H
404424
return
405425
}
406426

407-
// Read the request body.
408-
bodyBytes, err := io.ReadAll(r.Body)
427+
buf := bufPool.Get().(*bytes.Buffer)
428+
defer bufPool.Put(buf)
429+
430+
bodyBytes, err := io.ReadAll(io.TeeReader(r.Body, buf))
409431
if err != nil {
410432
logger.Error("Error reading request body", "err", err)
411433
http.Error(w, err.Error(), http.StatusBadRequest)

0 commit comments

Comments
 (0)