Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

remote: sideband support #156

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions plumbing/protocol/packp/sideband/demux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sideband

import (
"bytes"
"errors"
"fmt"
"io"
Expand All @@ -15,22 +14,23 @@ var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")
// Progress allows to read the progress information
type Progress interface {
io.Reader
io.Writer
}

// Demuxer demultiplex the progress reports and error info interleaved with the
// packfile itself.
//
// A sideband has three diferent channels the main one call PackData contains
// A sideband has three different channels the main one, call PackData, contains
// the packfile data, the ErrorMessage channel, that contains server errors and
// the last one ProgressMessage channel containing information about the ongoing
// tast happening in the server (optinal, can be suppressed sending NoProgress
// task happening in the server (optional, can be suppressed sending NoProgress
// or Quiet capabilities to the server)
Copy link
Contributor

@alcortesm alcortesm Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English grammar and typos, consider this alternate version:

Sideband information comes through 3 different channels:
- PackData: the packfile data itself.
- ErrorMessage: server errors.
- ProgressMessage: progress information about the packfile download
  process.  When the NoProgress and/or Quiet capabilities are in use,
  this channel remains silent.

//
// In order to demultiplex the data stream, method `Read` should be called to
// retrieve the PackData channel, the incoming data from the ProgressMessage is
// stored and can be read from `Progress` field, if any message is retrieved
// from the ErrorMessage channel an error is returned and we can assume that the
// conection has been closed.
// written at `Progress` (if any), if any message is retrieved from the
// ErrorMessage channel an error is returned and we can assume that the
// connection has been closed.
type Demuxer struct {
Copy link
Contributor

@alcortesm alcortesm Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing as the Demuxer type does not actually demultiplex the channels and it is not clear how are errors messages sent back to the user. Also a connection is mentioned in the comment, and some assumptions about it being closed, but it is not clear how this can be relevant for anything.

I think a better comment would be something like this:

[Foo] is a reader to the packfile data in the PackData channel.  The progress information
is writen to the the Progress field while reading.  If any message is received through the
ErrorMessage channel, the `Read` method will return it as an error and it will assume the
connection has been closed.

Although I am still not sure about the connection thing.

I would rather use a real demuxer though or even simply a method that returns 3 readers (or even better, 3 scanners): one for each channel, treating them as equal, with the same usage semantics. This will have a much simpler API and will let the user decide how to use each channel.

Another approach would be to use a single scanner, returning channel and data when read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this is not a PR of the sideband demux this was already merged. Your text doesn't explain that the Progress could be optional. I will review the text with the new phrase

t Type
r io.Reader
Expand All @@ -39,7 +39,7 @@ type Demuxer struct {
max int
pending []byte

// Progress contains progress information
// Progress is where the progress messages are stored
Progress Progress
}

Expand All @@ -51,21 +51,19 @@ func NewDemuxer(t Type, r io.Reader) *Demuxer {
}

return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
Progress: bytes.NewBuffer(nil),
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
}
}

// Read reads up to len(p) bytes from the PackData channel into p, an error can
// be return if an error happends when reading or if a message is sent in the
// be return if an error happens when reading or if a message is sent in the
// ErrorMessage channel.
//
// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is
// stored and can be read through the reader Progress. If the n value returned
// is zero, err will be nil unless an error reading happens.
// When a ProgressMessage is read, is not copy to b, instead of this is written
// to the Progress
func (d *Demuxer) Read(b []byte) (n int, err error) {
var read, req int

Expand Down Expand Up @@ -126,13 +124,17 @@ func (d *Demuxer) nextPackData() ([]byte, error) {
case PackData:
return content[1:], nil
case ProgressMessage:
_, err := d.Progress.(io.Writer).Write(content[1:])
return nil, err
if d.Progress != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think NewDemuxer should has a Progress as an argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a optional argument, and not the most common behavior, so that's why is not part of the New method

_, err := d.Progress.Write(content[1:])
return nil, err
}
case ErrorMessage:
return nil, fmt.Errorf("unexpected error: %s", content[1:])
default:
return nil, fmt.Errorf("unknown channel %s", content)
}

return nil, nil
}

func (d *Demuxer) getPending() (b []byte) {
Expand Down
3 changes: 3 additions & 0 deletions plumbing/protocol/packp/sideband/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (s *SidebandSuite) TestDecode(c *C) {
buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(PackData.WithPayload(expected[0:8]))
e.Encode(ProgressMessage.WithPayload([]byte{'F', 'O', 'O', '\n'}))
e.Encode(PackData.WithPayload(expected[8:16]))
e.Encode(PackData.WithPayload(expected[16:26]))

Expand Down Expand Up @@ -92,6 +93,8 @@ func (s *SidebandSuite) TestDecodeWithProgress(c *C) {

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
d.Progress = bytes.NewBuffer(nil)

n, err := io.ReadFull(d, content)
c.Assert(err, IsNil)
c.Assert(n, Equals, 26)
Expand Down
2 changes: 1 addition & 1 deletion plumbing/protocol/packp/sideband/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const chLen = 1
//
// If t is equal to `Sideband` the max pack size is set to MaxPackedSize, in any
// other value is given, max pack is set to MaxPackedSize64k, that is the
// maximum lenght of a line in pktline format.
// maximum length of a line in pktline format.
func NewMuxer(t Type, w io.Writer) *Muxer {
max := MaxPackedSize64k
if t == Sideband {
Expand Down
6 changes: 6 additions & 0 deletions plumbing/protocol/packp/ulreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func NewUploadRequestFromCapabilities(adv *capability.List) *UploadRequest {
r.Capabilities.Set(capability.MultiACK)
}

if adv.Supports(capability.Sideband64k) {
r.Capabilities.Set(capability.Sideband64k)
} else if adv.Supports(capability.Sideband) {
r.Capabilities.Set(capability.Sideband)
}

if adv.Supports(capability.ThinPack) {
r.Capabilities.Set(capability.ThinPack)
}
Expand Down
2 changes: 1 addition & 1 deletion plumbing/protocol/packp/ulreq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *UlReqSuite) TestNewUploadRequestFromCapabilities(c *C) {

r := NewUploadRequestFromCapabilities(cap)
c.Assert(r.Capabilities.String(), Equals,
"multi_ack_detailed thin-pack ofs-delta agent=go-git/4.x",
"multi_ack_detailed side-band-64k thin-pack ofs-delta agent=go-git/4.x",
)
}

Expand Down
2 changes: 0 additions & 2 deletions plumbing/transport/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ func transformSCPLikeIfNeeded(endpoint string) string {
var UnsupportedCapabilities = []capability.Capability{
capability.MultiACK,
capability.MultiACKDetailed,
capability.Sideband,
capability.Sideband64k,
capability.ThinPack,
}

Expand Down
47 changes: 18 additions & 29 deletions plumbing/transport/http/fetch_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"strconv"

"gopkg.in/src-d/go-git.v4/plumbing"
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
Expand Down Expand Up @@ -131,7 +131,7 @@ func discardResponseInfo(r io.Reader) error {
return s.Err()
}

func (s *fetchPackSession) doRequest(method, url string, content *strings.Reader) (*http.Response, error) {
func (s *fetchPackSession) doRequest(method, url string, content *bytes.Buffer) (*http.Response, error) {
var body io.Reader
if content != nil {
body = content
Expand All @@ -158,44 +158,33 @@ func (s *fetchPackSession) doRequest(method, url string, content *strings.Reader
return res, nil
}

func (s *fetchPackSession) applyHeadersToRequest(req *http.Request, content *strings.Reader) {
// it requires a bytes.Buffer, because we need to know the length
func (s *fetchPackSession) applyHeadersToRequest(req *http.Request, content *bytes.Buffer) {
req.Header.Add("User-Agent", "git/1.0")
req.Header.Add("Host", "github.com")
req.Header.Add("Host", s.endpoint.Host)

if content == nil {
req.Header.Add("Accept", "*/*")
} else {
req.Header.Add("Accept", "application/x-git-upload-pack-result")
req.Header.Add("Content-Type", "application/x-git-upload-pack-request")
req.Header.Add("Content-Length", string(content.Len()))
return
}

req.Header.Add("Accept", "application/x-git-upload-pack-result")
req.Header.Add("Content-Type", "application/x-git-upload-pack-request")
req.Header.Add("Content-Length", strconv.Itoa(content.Len()))
}

func uploadPackRequestToReader(r *packp.UploadPackRequest) (*strings.Reader, error) {
var buf bytes.Buffer
e := pktline.NewEncoder(&buf)
func uploadPackRequestToReader(req *packp.UploadPackRequest) (*bytes.Buffer, error) {
buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)

for _, want := range r.Wants {
_ = e.Encodef("want %s\n", want)
if err := req.UploadRequest.Encode(buf); err != nil {
return nil, fmt.Errorf("sending upload-req message: %s", err)
}

for _, have := range r.Haves {
_ = e.Encodef("have %s\n", have)
if err := req.UploadHaves.Encode(buf); err != nil {
return nil, fmt.Errorf("sending haves message: %s", err)
}

if r.Depth != nil {
depth, ok := r.Depth.(packp.DepthCommits)
if !ok {
return nil, fmt.Errorf("only commit depth is supported")
}

if depth != 0 {
_ = e.Encodef("deepen %d\n", depth)
}
}

_ = e.Flush()
_ = e.EncodeString("done\n")

return strings.NewReader(buf.String()), nil
return buf, nil
}
4 changes: 2 additions & 2 deletions plumbing/transport/http/fetch_pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (s *FetchPackSuite) TestuploadPackRequestToReader(c *C) {
c.Assert(err, IsNil)
b, _ := ioutil.ReadAll(sr)
c.Assert(string(b), Equals,
"0032want d82f291cde9987322c8a0c81a325e1ba6159684c\n"+
"0032want 2b41ef280fdb67a9b250678686a0c3e03b0a9989\n"+
"0032want 2b41ef280fdb67a9b250678686a0c3e03b0a9989\n"+
"0032want d82f291cde9987322c8a0c81a325e1ba6159684c\n0000"+
"0032have 6ecf0ef2c2dffb796033e5a02219af86ec6584e5\n0000"+
"0009done\n",
)
Expand Down
33 changes: 30 additions & 3 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"gopkg.in/src-d/go-git.v4/plumbing/format/packfile"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp/capability"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp/sideband"
"gopkg.in/src-d/go-git.v4/plumbing/storer"
"gopkg.in/src-d/go-git.v4/plumbing/transport"
"gopkg.in/src-d/go-git.v4/plumbing/transport/client"
Expand All @@ -22,6 +23,7 @@ var NoErrAlreadyUpToDate = errors.New("already up-to-date")
type Remote struct {
c *config.RemoteConfig
s Storer
p sideband.Progress

// cache fields, there during the connection is open
endpoint transport.Endpoint
Expand All @@ -31,8 +33,8 @@ type Remote struct {
refs memory.ReferenceStorage
}

func newRemote(s Storer, c *config.RemoteConfig) *Remote {
return &Remote{s: s, c: c}
func newRemote(s Storer, p sideband.Progress, c *config.RemoteConfig) *Remote {
return &Remote{s: s, p: p, c: c}
}

// Config return the config
Expand Down Expand Up @@ -125,7 +127,10 @@ func (r *Remote) Fetch(o *FetchOptions) (err error) {
}

defer checkClose(reader, &err)
if err := r.updateObjectStorage(reader); err != nil {

if err = r.updateObjectStorage(
r.buildSidebandIfSupported(req.Capabilities, reader),
); err != nil {
return err
}

Expand Down Expand Up @@ -178,6 +183,10 @@ func (r *Remote) buildRequest(
req.Capabilities.Set(capability.Shallow)
}

if r.p == nil && r.advRefs.Capabilities.Supports(capability.NoProgress) {
req.Capabilities.Set(capability.NoProgress)
}

for _, ref := range refs {
req.Wants = append(req.Wants, ref.Hash())
}
Expand Down Expand Up @@ -221,6 +230,24 @@ func (r *Remote) updateObjectStorage(reader io.Reader) error {
return err
}

func (r *Remote) buildSidebandIfSupported(l *capability.List, reader io.Reader) io.Reader {
var t sideband.Type

switch {
case l.Supports(capability.Sideband):
t = sideband.Sideband
case l.Supports(capability.Sideband64k):
t = sideband.Sideband64k
default:
return reader
}

d := sideband.NewDemuxer(t, reader)
d.Progress = r.p

return d
}

func (r *Remote) updateLocalReferenceStorage(specs []config.RefSpec, refs []*plumbing.Reference) error {
for _, spec := range specs {
for _, ref := range refs {
Expand Down
Loading