Skip to content
Merged
20 changes: 19 additions & 1 deletion internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ const (
serverSide
)

// maxWriteBufSize is the maximum length (number of elements) the cached
// writeBuf can grow to. The length depends on the number of buffers
// contained within the BufferSlice produced by the codec, which is
// generally small.
//
// If a writeBuf larger than this limit is required, it will be allocated
// and freed after use, rather than being cached. This avoids holding
// on to large amounts of memory.
const maxWriteBufSize = 64

// Loopy receives frames from the control buffer.
// Each frame is handled individually; most of the work done by loopy goes
// into handling data frames. Loopy maintains a queue of active streams, and each
Expand Down Expand Up @@ -530,7 +540,8 @@ type loopyWriter struct {

// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
writeBuf [][]byte

writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
Expand Down Expand Up @@ -1010,6 +1021,8 @@ func (l *loopyWriter) processData() (bool, error) {
if err != nil {
// This must never happen since the reader must have at least dSize
// bytes.
clear(l.writeBuf)
l.writeBuf = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is impossible then:

  1. logger.Error seems like a good idea, unless the caller already does that with what we return..
  2. We probably don't need to bother with the clear/nil (and surely don't want to do both?)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an error log and removed the buffer resetting.

return false, err
}
}
Expand All @@ -1026,6 +1039,11 @@ func (l *loopyWriter) processData() (bool, error) {
}
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
reader.Discard(dSize)
if cap(l.writeBuf) > maxWriteBufSize {
l.writeBuf = nil
} else {
clear(l.writeBuf)
}
if err != nil {
return false, err
}
Expand Down
3 changes: 0 additions & 3 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,6 @@ func (f *framer) writeData(streamID uint32, endStream bool, data [][]byte) error
return err
}
for _, d := range data {
if len(d) == 0 {
continue
}
if _, err := f.writer.Write(d); err != nil {
return err
}
Expand Down