Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Commit bd3dd4d

Browse files
authored
protocol/packp: sideband muxer and demuxer (#143)
* format/pakp: sideband demuxer * format/pakp: sideband muxer * format/pakp: sideband demuxer and muxer * protocol/pakp: sideband demuxer and muxer * documentation and improvements * improvements * handle scan errors properly
1 parent b0d756c commit bd3dd4d

File tree

6 files changed

+466
-0
lines changed

6 files changed

+466
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package sideband
2+
3+
// Type sideband type "side-band" or "side-band-64k"
4+
type Type int8
5+
6+
const (
7+
// Sideband legacy sideband type up to 1000-byte messages
8+
Sideband Type = iota
9+
// Sideband64k sideband type up to 65519-byte messages
10+
Sideband64k Type = iota
11+
12+
// MaxPackedSize for Sideband type
13+
MaxPackedSize = 1000
14+
// MaxPackedSize64k for Sideband64k type
15+
MaxPackedSize64k = 65520
16+
)
17+
18+
// Channel sideband channel
19+
type Channel byte
20+
21+
// WithPayload encode the payload as a message
22+
func (ch Channel) WithPayload(payload []byte) []byte {
23+
return append([]byte{byte(ch)}, payload...)
24+
}
25+
26+
const (
27+
// PackData packfile content
28+
PackData Channel = 1
29+
// ProgressMessage progress messages
30+
ProgressMessage Channel = 2
31+
// ErrorMessage fatal error message just before stream aborts
32+
ErrorMessage Channel = 3
33+
)
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package sideband
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"fmt"
7+
"io"
8+
9+
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
10+
)
11+
12+
// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded
13+
var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")
14+
15+
// Progress allows to read the progress information
16+
type Progress interface {
17+
io.Reader
18+
}
19+
20+
// Demuxer demultiplex the progress reports and error info interleaved with the
21+
// packfile itself.
22+
//
23+
// A sideband has three diferent channels the main one call PackData contains
24+
// the packfile data, the ErrorMessage channel, that contains server errors and
25+
// the last one ProgressMessage channel containing information about the ongoing
26+
// tast happening in the server (optinal, can be suppressed sending NoProgress
27+
// or Quiet capabilities to the server)
28+
//
29+
// In order to demultiplex the data stream, method `Read` should be called to
30+
// retrieve the PackData channel, the incoming data from the ProgressMessage is
31+
// stored and can be read from `Progress` field, if any message is retrieved
32+
// from the ErrorMessage channel an error is returned and we can assume that the
33+
// conection has been closed.
34+
type Demuxer struct {
35+
t Type
36+
r io.Reader
37+
s *pktline.Scanner
38+
39+
max int
40+
pending []byte
41+
42+
// Progress contains progress information
43+
Progress Progress
44+
}
45+
46+
// NewDemuxer returns a new Demuxer for the given t and read from r
47+
func NewDemuxer(t Type, r io.Reader) *Demuxer {
48+
max := MaxPackedSize64k
49+
if t == Sideband {
50+
max = MaxPackedSize
51+
}
52+
53+
return &Demuxer{
54+
t: t,
55+
r: r,
56+
max: max,
57+
s: pktline.NewScanner(r),
58+
Progress: bytes.NewBuffer(nil),
59+
}
60+
}
61+
62+
// Read reads up to len(p) bytes from the PackData channel into p, an error can
63+
// be return if an error happends when reading or if a message is sent in the
64+
// ErrorMessage channel.
65+
//
66+
// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is
67+
// stored and can be read through the reader Progress. If the n value returned
68+
// is zero, err will be nil unless an error reading happens.
69+
func (d *Demuxer) Read(b []byte) (n int, err error) {
70+
var read, req int
71+
72+
req = len(b)
73+
for read < req {
74+
n, err := d.doRead(b[read:req])
75+
read += n
76+
77+
if err != nil {
78+
return read, err
79+
}
80+
}
81+
82+
return read, nil
83+
}
84+
85+
func (d *Demuxer) doRead(b []byte) (int, error) {
86+
read, err := d.nextPackData()
87+
size := len(read)
88+
wanted := len(b)
89+
90+
if size > wanted {
91+
d.pending = read[wanted:]
92+
}
93+
94+
if wanted > size {
95+
wanted = size
96+
}
97+
98+
size = copy(b, read[:wanted])
99+
return size, err
100+
}
101+
102+
func (d *Demuxer) nextPackData() ([]byte, error) {
103+
content := d.getPending()
104+
if len(content) != 0 {
105+
return content, nil
106+
}
107+
108+
if !d.s.Scan() {
109+
if err := d.s.Err(); err != nil {
110+
return nil, err
111+
}
112+
113+
return nil, io.EOF
114+
}
115+
116+
content = d.s.Bytes()
117+
118+
size := len(content)
119+
if size == 0 {
120+
return nil, nil
121+
} else if size > d.max {
122+
return nil, ErrMaxPackedExceeded
123+
}
124+
125+
switch Channel(content[0]) {
126+
case PackData:
127+
return content[1:], nil
128+
case ProgressMessage:
129+
_, err := d.Progress.(io.Writer).Write(content[1:])
130+
return nil, err
131+
case ErrorMessage:
132+
return nil, fmt.Errorf("unexpected error: %s", content[1:])
133+
default:
134+
return nil, fmt.Errorf("unknown channel %s", content)
135+
}
136+
}
137+
138+
func (d *Demuxer) getPending() (b []byte) {
139+
if len(d.pending) == 0 {
140+
return nil
141+
}
142+
143+
content := d.pending
144+
d.pending = nil
145+
146+
return content
147+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package sideband
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"io"
7+
"io/ioutil"
8+
"testing"
9+
10+
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
11+
12+
. "gopkg.in/check.v1"
13+
)
14+
15+
func Test(t *testing.T) { TestingT(t) }
16+
17+
type SidebandSuite struct{}
18+
19+
var _ = Suite(&SidebandSuite{})
20+
21+
func (s *SidebandSuite) TestDecode(c *C) {
22+
expected := []byte("abcdefghijklmnopqrstuvwxyz")
23+
24+
buf := bytes.NewBuffer(nil)
25+
e := pktline.NewEncoder(buf)
26+
e.Encode(PackData.WithPayload(expected[0:8]))
27+
e.Encode(PackData.WithPayload(expected[8:16]))
28+
e.Encode(PackData.WithPayload(expected[16:26]))
29+
30+
content := make([]byte, 26)
31+
d := NewDemuxer(Sideband64k, buf)
32+
n, err := io.ReadFull(d, content)
33+
c.Assert(err, IsNil)
34+
c.Assert(n, Equals, 26)
35+
c.Assert(content, DeepEquals, expected)
36+
}
37+
38+
func (s *SidebandSuite) TestDecodeMoreThanContain(c *C) {
39+
expected := []byte("abcdefghijklmnopqrstuvwxyz")
40+
41+
buf := bytes.NewBuffer(nil)
42+
e := pktline.NewEncoder(buf)
43+
e.Encode(PackData.WithPayload(expected))
44+
45+
content := make([]byte, 42)
46+
d := NewDemuxer(Sideband64k, buf)
47+
n, err := io.ReadFull(d, content)
48+
c.Assert(err, Equals, io.ErrUnexpectedEOF)
49+
c.Assert(n, Equals, 26)
50+
c.Assert(content[0:26], DeepEquals, expected)
51+
}
52+
53+
func (s *SidebandSuite) TestDecodeWithError(c *C) {
54+
expected := []byte("abcdefghijklmnopqrstuvwxyz")
55+
56+
buf := bytes.NewBuffer(nil)
57+
e := pktline.NewEncoder(buf)
58+
e.Encode(PackData.WithPayload(expected[0:8]))
59+
e.Encode(ErrorMessage.WithPayload([]byte{'F', 'O', 'O', '\n'}))
60+
e.Encode(PackData.WithPayload(expected[8:16]))
61+
e.Encode(PackData.WithPayload(expected[16:26]))
62+
63+
content := make([]byte, 26)
64+
d := NewDemuxer(Sideband64k, buf)
65+
n, err := io.ReadFull(d, content)
66+
c.Assert(err, ErrorMatches, "unexpected error: FOO\n")
67+
c.Assert(n, Equals, 8)
68+
c.Assert(content[0:8], DeepEquals, expected[0:8])
69+
}
70+
71+
type mockReader struct{}
72+
73+
func (r *mockReader) Read([]byte) (int, error) { return 0, errors.New("foo") }
74+
75+
func (s *SidebandSuite) TestDecodeFromFailingReader(c *C) {
76+
content := make([]byte, 26)
77+
d := NewDemuxer(Sideband64k, &mockReader{})
78+
n, err := io.ReadFull(d, content)
79+
c.Assert(err, ErrorMatches, "foo")
80+
c.Assert(n, Equals, 0)
81+
}
82+
83+
func (s *SidebandSuite) TestDecodeWithProgress(c *C) {
84+
expected := []byte("abcdefghijklmnopqrstuvwxyz")
85+
86+
buf := bytes.NewBuffer(nil)
87+
e := pktline.NewEncoder(buf)
88+
e.Encode(PackData.WithPayload(expected[0:8]))
89+
e.Encode(ProgressMessage.WithPayload([]byte{'F', 'O', 'O', '\n'}))
90+
e.Encode(PackData.WithPayload(expected[8:16]))
91+
e.Encode(PackData.WithPayload(expected[16:26]))
92+
93+
content := make([]byte, 26)
94+
d := NewDemuxer(Sideband64k, buf)
95+
n, err := io.ReadFull(d, content)
96+
c.Assert(err, IsNil)
97+
c.Assert(n, Equals, 26)
98+
c.Assert(content, DeepEquals, expected)
99+
100+
progress, err := ioutil.ReadAll(d.Progress)
101+
c.Assert(err, IsNil)
102+
c.Assert(progress, DeepEquals, []byte{'F', 'O', 'O', '\n'})
103+
}
104+
105+
func (s *SidebandSuite) TestDecodeWithUnknownChannel(c *C) {
106+
107+
buf := bytes.NewBuffer(nil)
108+
e := pktline.NewEncoder(buf)
109+
e.Encode([]byte{'4', 'F', 'O', 'O', '\n'})
110+
111+
content := make([]byte, 26)
112+
d := NewDemuxer(Sideband64k, buf)
113+
n, err := io.ReadFull(d, content)
114+
c.Assert(err, ErrorMatches, "unknown channel 4FOO\n")
115+
c.Assert(n, Equals, 0)
116+
}
117+
118+
func (s *SidebandSuite) TestDecodeWithPending(c *C) {
119+
expected := []byte("abcdefghijklmnopqrstuvwxyz")
120+
121+
buf := bytes.NewBuffer(nil)
122+
e := pktline.NewEncoder(buf)
123+
e.Encode(PackData.WithPayload(expected[0:8]))
124+
e.Encode(PackData.WithPayload(expected[8:16]))
125+
e.Encode(PackData.WithPayload(expected[16:26]))
126+
127+
content := make([]byte, 13)
128+
d := NewDemuxer(Sideband64k, buf)
129+
n, err := io.ReadFull(d, content)
130+
c.Assert(err, IsNil)
131+
c.Assert(n, Equals, 13)
132+
c.Assert(content, DeepEquals, expected[0:13])
133+
134+
n, err = d.Read(content)
135+
c.Assert(err, IsNil)
136+
c.Assert(n, Equals, 13)
137+
c.Assert(content, DeepEquals, expected[13:26])
138+
}
139+
140+
func (s *SidebandSuite) TestDecodeErrMaxPacked(c *C) {
141+
buf := bytes.NewBuffer(nil)
142+
e := pktline.NewEncoder(buf)
143+
e.Encode(PackData.WithPayload(bytes.Repeat([]byte{'0'}, MaxPackedSize+1)))
144+
145+
content := make([]byte, 13)
146+
d := NewDemuxer(Sideband, buf)
147+
n, err := io.ReadFull(d, content)
148+
c.Assert(err, Equals, ErrMaxPackedExceeded)
149+
c.Assert(n, Equals, 0)
150+
151+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Package sideband implements a sideband mutiplex/demultiplexer
2+
package sideband
3+
4+
// If 'side-band' or 'side-band-64k' capabilities have been specified by
5+
// the client, the server will send the packfile data multiplexed.
6+
//
7+
// Either mode indicates that the packfile data will be streamed broken
8+
// up into packets of up to either 1000 bytes in the case of 'side_band',
9+
// or 65520 bytes in the case of 'side_band_64k'. Each packet is made up
10+
// of a leading 4-byte pkt-line length of how much data is in the packet,
11+
// followed by a 1-byte stream code, followed by the actual data.
12+
//
13+
// The stream code can be one of:
14+
//
15+
// 1 - pack data
16+
// 2 - progress messages
17+
// 3 - fatal error message just before stream aborts
18+
//
19+
// The "side-band-64k" capability came about as a way for newer clients
20+
// that can handle much larger packets to request packets that are
21+
// actually crammed nearly full, while maintaining backward compatibility
22+
// for the older clients.
23+
//
24+
// Further, with side-band and its up to 1000-byte messages, it's actually
25+
// 999 bytes of payload and 1 byte for the stream code. With side-band-64k,
26+
// same deal, you have up to 65519 bytes of data and 1 byte for the stream
27+
// code.
28+
//
29+
// The client MUST send only maximum of one of "side-band" and "side-
30+
// band-64k". Server MUST diagnose it as an error if client requests
31+
// both.

0 commit comments

Comments
 (0)