Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

remote: sideband support #164

Merged
merged 4 commits into from
Dec 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions plumbing/protocol/packp/sideband/demux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sideband

import (
"bytes"
"errors"
"fmt"
"io"
Expand All @@ -15,21 +14,22 @@ var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")
// Progress allows to read the progress information
type Progress interface {
io.Reader
io.Writer
}

// Demuxer demultiplex the progress reports and error info interleaved with the
// packfile itself.
//
// A sideband has three different channels the main one call PackData contains
// A sideband has three different channels the main one, call PackData, contains
// the packfile data, the ErrorMessage channel, that contains server errors and
// the last one ProgressMessage channel containing information about the ongoing
// task happening in the server (optinal, can be suppressed sending NoProgress
// task happening in the server (optional, can be suppressed sending NoProgress
// or Quiet capabilities to the server)
//
// In order to demultiplex the data stream, method `Read` should be called to
// retrieve the PackData channel, the incoming data from the ProgressMessage is
// stored and can be read from `Progress` field, if any message is retrieved
// from the ErrorMessage channel an error is returned and we can assume that the
// written at `Progress` (if any), if any message is retrieved from the
// ErrorMessage channel an error is returned and we can assume that the
// connection has been closed.
type Demuxer struct {
t Type
Expand All @@ -39,7 +39,7 @@ type Demuxer struct {
max int
pending []byte

// Progress contains progress information
// Progress is where the progress messages are stored
Progress Progress
}

Expand All @@ -51,21 +51,19 @@ func NewDemuxer(t Type, r io.Reader) *Demuxer {
}

return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
Progress: bytes.NewBuffer(nil),
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
}
}

// Read reads up to len(p) bytes from the PackData channel into p, an error can
// be return if an error happends when reading or if a message is sent in the
// be return if an error happens when reading or if a message is sent in the
// ErrorMessage channel.
//
// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is
// stored and can be read through the reader Progress. If the n value returned
// is zero, err will be nil unless an error reading happens.
// When a ProgressMessage is read, is not copy to b, instead of this is written
// to the Progress
func (d *Demuxer) Read(b []byte) (n int, err error) {
var read, req int

Expand Down Expand Up @@ -126,13 +124,17 @@ func (d *Demuxer) nextPackData() ([]byte, error) {
case PackData:
return content[1:], nil
case ProgressMessage:
_, err := d.Progress.(io.Writer).Write(content[1:])
return nil, err
if d.Progress != nil {
_, err := d.Progress.Write(content[1:])
return nil, err
}
case ErrorMessage:
return nil, fmt.Errorf("unexpected error: %s", content[1:])
default:
return nil, fmt.Errorf("unknown channel %s", content)
}

return nil, nil
}

func (d *Demuxer) getPending() (b []byte) {
Expand Down
3 changes: 3 additions & 0 deletions plumbing/protocol/packp/sideband/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (s *SidebandSuite) TestDecode(c *C) {
buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)
e.Encode(PackData.WithPayload(expected[0:8]))
e.Encode(ProgressMessage.WithPayload([]byte{'F', 'O', 'O', '\n'}))
e.Encode(PackData.WithPayload(expected[8:16]))
e.Encode(PackData.WithPayload(expected[16:26]))

Expand Down Expand Up @@ -92,6 +93,8 @@ func (s *SidebandSuite) TestDecodeWithProgress(c *C) {

content := make([]byte, 26)
d := NewDemuxer(Sideband64k, buf)
d.Progress = bytes.NewBuffer(nil)

n, err := io.ReadFull(d, content)
c.Assert(err, IsNil)
c.Assert(n, Equals, 26)
Expand Down
6 changes: 6 additions & 0 deletions plumbing/protocol/packp/ulreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func NewUploadRequestFromCapabilities(adv *capability.List) *UploadRequest {
r.Capabilities.Set(capability.MultiACK)
}

if adv.Supports(capability.Sideband64k) {
r.Capabilities.Set(capability.Sideband64k)
} else if adv.Supports(capability.Sideband) {
r.Capabilities.Set(capability.Sideband)
}

if adv.Supports(capability.ThinPack) {
r.Capabilities.Set(capability.ThinPack)
}
Expand Down
2 changes: 1 addition & 1 deletion plumbing/protocol/packp/ulreq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *UlReqSuite) TestNewUploadRequestFromCapabilities(c *C) {

r := NewUploadRequestFromCapabilities(cap)
c.Assert(r.Capabilities.String(), Equals,
"multi_ack_detailed thin-pack ofs-delta agent=go-git/4.x",
"multi_ack_detailed side-band-64k thin-pack ofs-delta agent=go-git/4.x",
)
}

Expand Down
2 changes: 0 additions & 2 deletions plumbing/transport/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ func transformSCPLikeIfNeeded(endpoint string) string {
var UnsupportedCapabilities = []capability.Capability{
capability.MultiACK,
capability.MultiACKDetailed,
capability.Sideband,
capability.Sideband64k,
capability.ThinPack,
}

Expand Down
47 changes: 18 additions & 29 deletions plumbing/transport/http/fetch_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"strconv"

"gopkg.in/src-d/go-git.v4/plumbing"
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *fetchPackSession) Close() error {
return nil
}

func (s *fetchPackSession) doRequest(method, url string, content *strings.Reader) (*http.Response, error) {
func (s *fetchPackSession) doRequest(method, url string, content *bytes.Buffer) (*http.Response, error) {
var body io.Reader
if content != nil {
body = content
Expand All @@ -144,44 +144,33 @@ func (s *fetchPackSession) doRequest(method, url string, content *strings.Reader
return res, nil
}

func (s *fetchPackSession) applyHeadersToRequest(req *http.Request, content *strings.Reader) {
// it requires a bytes.Buffer, because we need to know the length
func (s *fetchPackSession) applyHeadersToRequest(req *http.Request, content *bytes.Buffer) {
req.Header.Add("User-Agent", "git/1.0")
req.Header.Add("Host", "github.com")
req.Header.Add("Host", s.endpoint.Host)

if content == nil {
req.Header.Add("Accept", "*/*")
} else {
req.Header.Add("Accept", "application/x-git-upload-pack-result")
req.Header.Add("Content-Type", "application/x-git-upload-pack-request")
req.Header.Add("Content-Length", string(content.Len()))
return
}

req.Header.Add("Accept", "application/x-git-upload-pack-result")
req.Header.Add("Content-Type", "application/x-git-upload-pack-request")
req.Header.Add("Content-Length", strconv.Itoa(content.Len()))
}

func uploadPackRequestToReader(r *packp.UploadPackRequest) (*strings.Reader, error) {
var buf bytes.Buffer
e := pktline.NewEncoder(&buf)
func uploadPackRequestToReader(req *packp.UploadPackRequest) (*bytes.Buffer, error) {
buf := bytes.NewBuffer(nil)
e := pktline.NewEncoder(buf)

for _, want := range r.Wants {
_ = e.Encodef("want %s\n", want)
if err := req.UploadRequest.Encode(buf); err != nil {
return nil, fmt.Errorf("sending upload-req message: %s", err)
}

for _, have := range r.Haves {
_ = e.Encodef("have %s\n", have)
if err := req.UploadHaves.Encode(buf); err != nil {
return nil, fmt.Errorf("sending haves message: %s", err)
}

if r.Depth != nil {
depth, ok := r.Depth.(packp.DepthCommits)
if !ok {
return nil, fmt.Errorf("only commit depth is supported")
}

if depth != 0 {
_ = e.Encodef("deepen %d\n", depth)
}
}

_ = e.Flush()
_ = e.EncodeString("done\n")

return strings.NewReader(buf.String()), nil
return buf, nil
}
4 changes: 2 additions & 2 deletions plumbing/transport/http/fetch_pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (s *FetchPackSuite) TestuploadPackRequestToReader(c *C) {
c.Assert(err, IsNil)
b, _ := ioutil.ReadAll(sr)
c.Assert(string(b), Equals,
"0032want d82f291cde9987322c8a0c81a325e1ba6159684c\n"+
"0032want 2b41ef280fdb67a9b250678686a0c3e03b0a9989\n"+
"0032want 2b41ef280fdb67a9b250678686a0c3e03b0a9989\n"+
"0032want d82f291cde9987322c8a0c81a325e1ba6159684c\n0000"+
"0032have 6ecf0ef2c2dffb796033e5a02219af86ec6584e5\n0000"+
"0009done\n",
)
Expand Down
33 changes: 30 additions & 3 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"gopkg.in/src-d/go-git.v4/plumbing/format/packfile"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp/capability"
"gopkg.in/src-d/go-git.v4/plumbing/protocol/packp/sideband"
"gopkg.in/src-d/go-git.v4/plumbing/storer"
"gopkg.in/src-d/go-git.v4/plumbing/transport"
"gopkg.in/src-d/go-git.v4/plumbing/transport/client"
Expand All @@ -22,6 +23,7 @@ var NoErrAlreadyUpToDate = errors.New("already up-to-date")
type Remote struct {
c *config.RemoteConfig
s Storer
p sideband.Progress

// cache fields, there during the connection is open
endpoint transport.Endpoint
Expand All @@ -31,8 +33,8 @@ type Remote struct {
refs memory.ReferenceStorage
}

func newRemote(s Storer, c *config.RemoteConfig) *Remote {
return &Remote{s: s, c: c}
func newRemote(s Storer, p sideband.Progress, c *config.RemoteConfig) *Remote {
return &Remote{s: s, p: p, c: c}
}

// Config return the config
Expand Down Expand Up @@ -125,7 +127,10 @@ func (r *Remote) Fetch(o *FetchOptions) (err error) {
}

defer checkClose(reader, &err)
if err := r.updateObjectStorage(reader); err != nil {

if err = r.updateObjectStorage(
r.buildSidebandIfSupported(req.Capabilities, reader),
); err != nil {
return err
}

Expand Down Expand Up @@ -178,6 +183,10 @@ func (r *Remote) buildRequest(
req.Capabilities.Set(capability.Shallow)
}

if r.p == nil && r.advRefs.Capabilities.Supports(capability.NoProgress) {
req.Capabilities.Set(capability.NoProgress)
}

for _, ref := range refs {
req.Wants = append(req.Wants, ref.Hash())
}
Expand Down Expand Up @@ -221,6 +230,24 @@ func (r *Remote) updateObjectStorage(reader io.Reader) error {
return err
}

func (r *Remote) buildSidebandIfSupported(l *capability.List, reader io.Reader) io.Reader {
var t sideband.Type

switch {
case l.Supports(capability.Sideband):
t = sideband.Sideband
case l.Supports(capability.Sideband64k):
t = sideband.Sideband64k
default:
return reader
}

d := sideband.NewDemuxer(t, reader)
d.Progress = r.p

return d
}

func (r *Remote) updateLocalReferenceStorage(specs []config.RefSpec, refs []*plumbing.Reference) error {
for _, spec := range specs {
for _, ref := range refs {
Expand Down
Loading