Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

protocol/packp: sideband muxer and demuxer #143

Merged
merged 8 commits into from
Nov 30, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions plumbing/protocol/packp/sideband/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sideband

// Type sideband type "side-band" or "side-band-64k"
type Type int8

const (
// Sideband legacy sideband type up to 1000-byte messages
Sideband Type = iota
// Sideband64k sideband type up to 65519-byte messages
Sideband64k Type = iota

// MaxPackedSize for Sideband type
MaxPackedSize = 1000
// MaxPackedSize64k for Sideband64k type
MaxPackedSize64k = 65520
)

// Channel sideband channel
type Channel byte

// Bytes returns the channel as an slice of bytes
func (ch Channel) Bytes() []byte {
return []byte{byte(ch)}
}

const (
// PackData packfile content
PackData Channel = 1
// ProgressMessage progress messages
ProgressMessage Channel = 2
// ErrorMessage fatal error message just before stream aborts
ErrorMessage Channel = 3
)
148 changes: 148 additions & 0 deletions plumbing/protocol/packp/sideband/demux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package sideband

import (
"bytes"
"errors"
"fmt"
"io"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space

"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
)

// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded
var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")

// Progress allows to read the progress information
type Progress interface {
io.Reader
}

// Demuxer demultiplex the progress reports and error info interleaved with the
// packfile itself.
//
// A sideband has three diferent channels the main one call PackData contains
// the packfile data, the ErrorMessage channel, that contains server errors and
// the last one ProgressMessage channel containing information about the ongoing
// tast happening in the server (optinal, can be suppressed sending NoProgress
// or Quiet capabilities to the server)
//
// In order to demultiplex the data stream, method `Read` should be called to
// retrieve the PackData channel, the incoming data from the ProgressMessage is
// stored and can be read from `Progress` field, if any message is retrieved
// from the ErrorMessage channel an error is returned and we can assume that the
// conection has been closed.
type Demuxer struct {
t Type
r io.Reader
s *pktline.Scanner

max int
pending []byte

// Progress contains progress information
Progress Progress
}

// NewDemuxer returns a new Demuxer for the given t and read from r
func NewDemuxer(t Type, r io.Reader) *Demuxer {
max := MaxPackedSize64k
if t == Sideband {
max = MaxPackedSize
}

return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
Progress: bytes.NewBuffer(nil),
}
}

// Read reads up to len(p) bytes from the PackData channel into p, an error can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this description is not exact: I think this method reads from the 3 channels, not just the PackData channel.

I think this requires a better explanation, as there are 3 channels but only 2 readers. I would write a small table explaining how to read from each channel and how to know if there is data in any of them, maybe as an explanation to the package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a more detailed explanation of Demux at the struct, I think is clear enough, this function (specially for the user), only reads Packdata into p

// be return if an error happends when reading or if a message is sent in the
// ErrorMessage channel.
//
// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is
// stored and can be read through the reader Progress. If the n value returned
// is zero, err will be nil unless an error reading happens.
func (d *Demuxer) Read(b []byte) (n int, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrite the second paragraph. An example:

If a ProgressMessage is read, it won't be copied to b. Instead of this, it is stored and can be read through the reader Progress. If the n value returned is zero, err will be nil unless an error reading happens.

var read, req int

req = len(b)
for read < req {
n, err := d.doRead(b[read:req])
read += n

if err == io.EOF {
break
}

if err != nil {
return read, err
}
}

return read, nil
}

func (d *Demuxer) doRead(b []byte) (int, error) {
read, err := d.nextPackData()
size := len(read)
wanted := len(b)

if size > wanted {
d.pending = read[wanted:size]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify read[wanted:size] to read[wanted:].

}

if wanted > size {
wanted = size
}

size = copy(b, read[:wanted])
return size, err
}

func (d *Demuxer) nextPackData() ([]byte, error) {
content := d.readPending()
if len(content) != 0 {
return content, nil
}

if !d.s.Scan() {
return nil, io.EOF
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no more input, shouldn't the scanner error be checked instead of always returning io.EOF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And Error is always nil, when Scan returns io.EOF @alcortesm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Error() is not always nil and Scan() does not return io.EOF, it returns a boolean instead: from pktline.Scanner.Scan(), plumbing/format/pktline/scanner.go:44 :

// Scan advances the Scanner to the next pkt-line, whose payload will
// then be available through the Bytes method.  Scanning stops at EOF
// or the first I/O error.  After Scan returns false, the Err method
// will return any error that occurred during scanning, except that if
// it was io.EOF, Err will return nil.
func (s *Scanner) Scan() bool {

We should definitely check the scanner error in this case and return io.EOF if the error is nil.

This is a similar behavior as bufio.Scanner, see https://golang.org/pkg/bufio/#example_Scanner_lines for example.

}

content = d.s.Bytes()
err := d.s.Err()

size := len(content)
if size == 0 {
return nil, err
} else if size > d.max {
return nil, ErrMaxPackedExceeded
}

switch Channel(content[0]) {
case PackData:
return content[1:], err
case ProgressMessage:
_, err := d.Progress.(io.Writer).Write(content[1:])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Progress needs to be a Writer, shouldn't it be defined as io.ReadWriter instead of just io.Reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is preventing being written by external sources, also I can to declare the intent that should be used for reading

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return nil, err
case ErrorMessage:
return nil, fmt.Errorf("unexpected error: %s", content[1:])
default:
return nil, fmt.Errorf("unknown channel %s", content)
}
}

func (d *Demuxer) readPending() (b []byte) {
if len(d.pending) == 0 {
return nil
}

content := d.pending
d.pending = make([]byte, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this to:

d.pending =  nil

to avoid allocating memory that will be thrown away when size > wanted in doRead.

This is convoluted and a little confusing, as the whole function does not really read and it is just two lines:

content := d.pending
d.pending = nil

And then we have to reslice in the calling function. Better remove the function and reslice directly.


return content
}
122 changes: 122 additions & 0 deletions plumbing/protocol/packp/sideband/demux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package sideband

import (
"bytes"
"io/ioutil"
"testing"

"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import group order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad import order?


. "gopkg.in/check.v1"
)

func Test(t *testing.T) { TestingT(t) }

type SidebandSuite struct{}

var _ = Suite(&SidebandSuite{})

func (s *SidebandSuite) TestDecode(c *C) {
expected := []byte("abcdefghijklmnopqrstuvwxyz")

buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(append(PackData.Bytes(), expected[0:8]...))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome.

I think you should write this line as an example in the comment of the Channel.Byte() method, so everybody understand, at a single glance, what is its intended use case.

You can also call the method func (c Channel) WithPayload(b []byte) []byte, so its usage will be a little more clear:

e.Encode(PackData.WithPayload(expected[0:8])

And make it return a multireader to save memory.

e.Encode(append(PackData.Bytes(), expected[8:16]...))
e.Encode(append(PackData.Bytes(), expected[16:26]...))

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
n, err := d.Read(content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change Read to ReadFull.

The same for all the tests.

c.Assert(err, IsNil)
c.Assert(n, Equals, 26)
c.Assert(content, DeepEquals, expected)
}

func (s *SidebandSuite) TestDecodeWithError(c *C) {
expected := []byte("abcdefghijklmnopqrstuvwxyz")

buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(append(PackData.Bytes(), expected[0:8]...))
e.Encode(append(ErrorMessage.Bytes(), 'F', 'O', 'O', '\n'))
e.Encode(append(PackData.Bytes(), expected[8:16]...))
e.Encode(append(PackData.Bytes(), expected[16:26]...))

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
n, err := d.Read(content)
c.Assert(err, ErrorMatches, "unexepcted error: FOO\n")
c.Assert(n, Equals, 8)
c.Assert(content[0:8], DeepEquals, expected[0:8])
}

func (s *SidebandSuite) TestDecodeWithProgress(c *C) {
expected := []byte("abcdefghijklmnopqrstuvwxyz")

buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(append(PackData.Bytes(), expected[0:8]...))
e.Encode(append(ProgressMessage.Bytes(), 'F', 'O', 'O', '\n'))
e.Encode(append(PackData.Bytes(), expected[8:16]...))
e.Encode(append(PackData.Bytes(), expected[16:26]...))

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
n, err := d.Read(content)
c.Assert(err, IsNil)
c.Assert(n, Equals, 26)
c.Assert(content, DeepEquals, expected)

progress, err := ioutil.ReadAll(d.Progress)
c.Assert(err, IsNil)
c.Assert(progress, DeepEquals, []byte{'F', 'O', 'O', '\n'})
}

func (s *SidebandSuite) TestDecodeWithUnknownChannel(c *C) {

buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode([]byte{'4', 'F', 'O', 'O', '\n'})

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
n, err := d.Read(content)
c.Assert(err, ErrorMatches, "unknown channel 4FOO\n")
c.Assert(n, Equals, 0)
}

func (s *SidebandSuite) TestDecodeWithPending(c *C) {
expected := []byte("abcdefghijklmnopqrstuvwxyz")

buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(append(PackData.Bytes(), expected[0:8]...))
e.Encode(append(PackData.Bytes(), expected[8:16]...))
e.Encode(append(PackData.Bytes(), expected[16:26]...))

content := make([]byte, 13)
d := NewDemuxer(Sideband64k, buf)
n, err := d.Read(content)
c.Assert(err, IsNil)
c.Assert(n, Equals, 13)
c.Assert(content, DeepEquals, expected[0:13])

n, err = d.Read(content)
c.Assert(err, IsNil)
c.Assert(n, Equals, 13)
c.Assert(content, DeepEquals, expected[13:26])
}

func (s *SidebandSuite) TestDecodeErrMaxPacked(c *C) {
buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(bytes.Repeat(PackData.Bytes(), MaxPackedSize+1))

content := make([]byte, 13)
d := NewDemuxer(Sideband, buf)
n, err := d.Read(content)
c.Assert(err, Equals, ErrMaxPackedExceeded)
c.Assert(n, Equals, 0)

}
65 changes: 65 additions & 0 deletions plumbing/protocol/packp/sideband/muxer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package sideband

import (
"io"

"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
)

// Muxer multiplex the packfile along with the progress messages and the error
// information. The multiplex is perform using pktline format.
type Muxer struct {
max int
e *pktline.Encoder
}

const chLen = 1

// NewMuxer returns a new Muxer for the given t that writes on w.
//
// If t is equal to `Sideband` the max pack size is set to MaxPackedSize, in any
// other value is given, max pack is set to MaxPackedSize64k, that is the
// maximum lenght of a line in pktline format.
func NewMuxer(t Type, w io.Writer) *Muxer {
max := MaxPackedSize64k
if t == Sideband {
max = MaxPackedSize
}

return &Muxer{
max: max - chLen,
e: pktline.NewEncoder(w),
}
}

// Write writes p in the PackData channel
func (m *Muxer) Write(p []byte) (int, error) {
return m.WriteChannel(PackData, p)
}

// WriteChannel writes p in the given channel. This method can be used with any
// channel, but is recommend use it only for the ProgressMessage and
// ErrorMessage channels and use Write for the PackData channel
func (m *Muxer) WriteChannel(t Channel, p []byte) (int, error) {
wrote := 0
size := len(p)
for wrote < size {
n, err := m.doWrite(t, p[wrote:])
wrote += n

if err != nil {
return wrote, err
}
}

return wrote, nil
}

func (m *Muxer) doWrite(ch Channel, p []byte) (int, error) {
sz := len(p)
if sz > m.max {
sz = m.max
}

return sz, m.e.Encode(append(ch.Bytes(), p[:sz]...))
}
Loading