-
Notifications
You must be signed in to change notification settings - Fork 534
protocol/packp: sideband muxer and demuxer #143
Changes from 7 commits
4da9aac
55455d9
2d5329c
af8c921
4fe3630
c1c3b8c
71ce6e0
7ffc308
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package sideband | ||
|
||
// Type sideband type "side-band" or "side-band-64k" | ||
type Type int8 | ||
|
||
const ( | ||
// Sideband legacy sideband type up to 1000-byte messages | ||
Sideband Type = iota | ||
// Sideband64k sideband type up to 65519-byte messages | ||
Sideband64k Type = iota | ||
|
||
// MaxPackedSize for Sideband type | ||
MaxPackedSize = 1000 | ||
// MaxPackedSize64k for Sideband64k type | ||
MaxPackedSize64k = 65520 | ||
) | ||
|
||
// Channel sideband channel | ||
type Channel byte | ||
|
||
// WithPayload encode the payload as a message | ||
func (ch Channel) WithPayload(payload []byte) []byte { | ||
return append([]byte{byte(ch)}, payload...) | ||
} | ||
|
||
const ( | ||
// PackData packfile content | ||
PackData Channel = 1 | ||
// ProgressMessage progress messages | ||
ProgressMessage Channel = 2 | ||
// ErrorMessage fatal error message just before stream aborts | ||
ErrorMessage Channel = 3 | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package sideband | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"fmt" | ||
"io" | ||
|
||
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline" | ||
) | ||
|
||
// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded | ||
var ErrMaxPackedExceeded = errors.New("max. packed size exceeded") | ||
|
||
// Progress allows to read the progress information | ||
type Progress interface { | ||
io.Reader | ||
} | ||
|
||
// Demuxer demultiplex the progress reports and error info interleaved with the | ||
// packfile itself. | ||
// | ||
// A sideband has three diferent channels the main one call PackData contains | ||
// the packfile data, the ErrorMessage channel, that contains server errors and | ||
// the last one ProgressMessage channel containing information about the ongoing | ||
// tast happening in the server (optinal, can be suppressed sending NoProgress | ||
// or Quiet capabilities to the server) | ||
// | ||
// In order to demultiplex the data stream, method `Read` should be called to | ||
// retrieve the PackData channel, the incoming data from the ProgressMessage is | ||
// stored and can be read from `Progress` field, if any message is retrieved | ||
// from the ErrorMessage channel an error is returned and we can assume that the | ||
// conection has been closed. | ||
type Demuxer struct { | ||
t Type | ||
r io.Reader | ||
s *pktline.Scanner | ||
|
||
max int | ||
pending []byte | ||
|
||
// Progress contains progress information | ||
Progress Progress | ||
} | ||
|
||
// NewDemuxer returns a new Demuxer for the given t and read from r | ||
func NewDemuxer(t Type, r io.Reader) *Demuxer { | ||
max := MaxPackedSize64k | ||
if t == Sideband { | ||
max = MaxPackedSize | ||
} | ||
|
||
return &Demuxer{ | ||
t: t, | ||
r: r, | ||
max: max, | ||
s: pktline.NewScanner(r), | ||
Progress: bytes.NewBuffer(nil), | ||
} | ||
} | ||
|
||
// Read reads up to len(p) bytes from the PackData channel into p, an error can | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this description is not exact: I think this method reads from the 3 channels, not just the PackData channel. I think this requires a better explanation, as there are 3 channels but only 2 readers. I would write a small table explaining how to read from each channel and how to know if there is data in any of them, maybe as an explanation to the package. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have a more detailed explanation of Demux at the struct, I think is clear enough, this function (specially for the user), only reads Packdata into p |
||
// be return if an error happends when reading or if a message is sent in the | ||
// ErrorMessage channel. | ||
// | ||
// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is | ||
// stored and can be read through the reader Progress. If the n value returned | ||
// is zero, err will be nil unless an error reading happens. | ||
func (d *Demuxer) Read(b []byte) (n int, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rewrite the second paragraph. An example:
|
||
var read, req int | ||
|
||
req = len(b) | ||
for read < req { | ||
n, err := d.doRead(b[read:req]) | ||
read += n | ||
|
||
if err == io.EOF { | ||
break | ||
} | ||
|
||
if err != nil { | ||
return read, err | ||
} | ||
} | ||
|
||
return read, nil | ||
} | ||
|
||
func (d *Demuxer) doRead(b []byte) (int, error) { | ||
read, err := d.nextPackData() | ||
size := len(read) | ||
wanted := len(b) | ||
|
||
if size > wanted { | ||
d.pending = read[wanted:] | ||
} | ||
|
||
if wanted > size { | ||
wanted = size | ||
} | ||
|
||
size = copy(b, read[:wanted]) | ||
return size, err | ||
} | ||
|
||
func (d *Demuxer) nextPackData() ([]byte, error) { | ||
content := d.getPending() | ||
if len(content) != 0 { | ||
return content, nil | ||
} | ||
|
||
if !d.s.Scan() { | ||
return nil, io.EOF | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there is no more input, shouldn't the scanner error be checked instead of always returning io.EOF? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And Error is always nil, when Scan returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No,
We should definitely check the scanner error in this case and return This is a similar behavior as |
||
} | ||
|
||
content = d.s.Bytes() | ||
err := d.s.Err() | ||
|
||
size := len(content) | ||
if size == 0 { | ||
return nil, err | ||
} else if size > d.max { | ||
return nil, ErrMaxPackedExceeded | ||
} | ||
|
||
switch Channel(content[0]) { | ||
case PackData: | ||
return content[1:], err | ||
case ProgressMessage: | ||
_, err := d.Progress.(io.Writer).Write(content[1:]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Progress needs to be a Writer, shouldn't it be defined as io.ReadWriter instead of just io.Reader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is preventing being written by external sources, also I can to declare the intent that should be used for reading There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return nil, err | ||
case ErrorMessage: | ||
return nil, fmt.Errorf("unexpected error: %s", content[1:]) | ||
default: | ||
return nil, fmt.Errorf("unknown channel %s", content) | ||
} | ||
} | ||
|
||
func (d *Demuxer) getPending() (b []byte) { | ||
if len(d.pending) == 0 { | ||
return nil | ||
} | ||
|
||
content := d.pending | ||
d.pending = nil | ||
|
||
return content | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package sideband | ||
|
||
import ( | ||
"bytes" | ||
"io/ioutil" | ||
"testing" | ||
|
||
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. import group order There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bad import order? |
||
|
||
"io" | ||
|
||
. "gopkg.in/check.v1" | ||
) | ||
|
||
func Test(t *testing.T) { TestingT(t) } | ||
|
||
type SidebandSuite struct{} | ||
|
||
var _ = Suite(&SidebandSuite{}) | ||
|
||
func (s *SidebandSuite) TestDecode(c *C) { | ||
expected := []byte("abcdefghijklmnopqrstuvwxyz") | ||
|
||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode(PackData.WithPayload(expected[0:8])) | ||
e.Encode(PackData.WithPayload(expected[8:16])) | ||
e.Encode(PackData.WithPayload(expected[16:26])) | ||
|
||
content := make([]byte, 26) | ||
d := NewDemuxer(Sideband64k, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, IsNil) | ||
c.Assert(n, Equals, 26) | ||
c.Assert(content, DeepEquals, expected) | ||
} | ||
|
||
func (s *SidebandSuite) TestDecodeWithError(c *C) { | ||
expected := []byte("abcdefghijklmnopqrstuvwxyz") | ||
|
||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode(PackData.WithPayload(expected[0:8])) | ||
e.Encode(ErrorMessage.WithPayload([]byte{'F', 'O', 'O', '\n'})) | ||
e.Encode(PackData.WithPayload(expected[8:16])) | ||
e.Encode(PackData.WithPayload(expected[16:26])) | ||
|
||
content := make([]byte, 26) | ||
d := NewDemuxer(Sideband64k, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, ErrorMatches, "unexpected error: FOO\n") | ||
c.Assert(n, Equals, 8) | ||
c.Assert(content[0:8], DeepEquals, expected[0:8]) | ||
} | ||
|
||
func (s *SidebandSuite) TestDecodeWithProgress(c *C) { | ||
expected := []byte("abcdefghijklmnopqrstuvwxyz") | ||
|
||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode(PackData.WithPayload(expected[0:8])) | ||
e.Encode(ProgressMessage.WithPayload([]byte{'F', 'O', 'O', '\n'})) | ||
e.Encode(PackData.WithPayload(expected[8:16])) | ||
e.Encode(PackData.WithPayload(expected[16:26])) | ||
|
||
content := make([]byte, 26) | ||
d := NewDemuxer(Sideband64k, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, IsNil) | ||
c.Assert(n, Equals, 26) | ||
c.Assert(content, DeepEquals, expected) | ||
|
||
progress, err := ioutil.ReadAll(d.Progress) | ||
c.Assert(err, IsNil) | ||
c.Assert(progress, DeepEquals, []byte{'F', 'O', 'O', '\n'}) | ||
} | ||
|
||
func (s *SidebandSuite) TestDecodeWithUnknownChannel(c *C) { | ||
|
||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode([]byte{'4', 'F', 'O', 'O', '\n'}) | ||
|
||
content := make([]byte, 26) | ||
d := NewDemuxer(Sideband64k, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, ErrorMatches, "unknown channel 4FOO\n") | ||
c.Assert(n, Equals, 0) | ||
} | ||
|
||
func (s *SidebandSuite) TestDecodeWithPending(c *C) { | ||
expected := []byte("abcdefghijklmnopqrstuvwxyz") | ||
|
||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode(PackData.WithPayload(expected[0:8])) | ||
e.Encode(PackData.WithPayload(expected[8:16])) | ||
e.Encode(PackData.WithPayload(expected[16:26])) | ||
|
||
content := make([]byte, 13) | ||
d := NewDemuxer(Sideband64k, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, IsNil) | ||
c.Assert(n, Equals, 13) | ||
c.Assert(content, DeepEquals, expected[0:13]) | ||
|
||
n, err = d.Read(content) | ||
c.Assert(err, IsNil) | ||
c.Assert(n, Equals, 13) | ||
c.Assert(content, DeepEquals, expected[13:26]) | ||
} | ||
|
||
func (s *SidebandSuite) TestDecodeErrMaxPacked(c *C) { | ||
buf := bytes.NewBuffer(nil) | ||
e := pktline.NewEncoder(buf) | ||
e.Encode(PackData.WithPayload(bytes.Repeat([]byte{'0'}, MaxPackedSize+1))) | ||
|
||
content := make([]byte, 13) | ||
d := NewDemuxer(Sideband, buf) | ||
n, err := io.ReadFull(d, content) | ||
c.Assert(err, Equals, ErrMaxPackedExceeded) | ||
c.Assert(n, Equals, 0) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
// Package sideband implements a sideband mutiplex/demultiplexer | ||
package sideband | ||
|
||
// If 'side-band' or 'side-band-64k' capabilities have been specified by | ||
// the client, the server will send the packfile data multiplexed. | ||
// | ||
// Either mode indicates that the packfile data will be streamed broken | ||
// up into packets of up to either 1000 bytes in the case of 'side_band', | ||
// or 65520 bytes in the case of 'side_band_64k'. Each packet is made up | ||
// of a leading 4-byte pkt-line length of how much data is in the packet, | ||
// followed by a 1-byte stream code, followed by the actual data. | ||
// | ||
// The stream code can be one of: | ||
// | ||
// 1 - pack data | ||
// 2 - progress messages | ||
// 3 - fatal error message just before stream aborts | ||
// | ||
// The "side-band-64k" capability came about as a way for newer clients | ||
// that can handle much larger packets to request packets that are | ||
// actually crammed nearly full, while maintaining backward compatibility | ||
// for the older clients. | ||
// | ||
// Further, with side-band and its up to 1000-byte messages, it's actually | ||
// 999 bytes of payload and 1 byte for the stream code. With side-band-64k, | ||
// same deal, you have up to 65519 bytes of data and 1 byte for the stream | ||
// code. | ||
// | ||
// The client MUST send only maximum of one of "side-band" and "side- | ||
// band-64k". Server MUST diagnose it as an error if client requests | ||
// both. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space