Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Fix ssh workflow #96

Merged
merged 7 commits into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 78 additions & 50 deletions clients/ssh/git_upload_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
package ssh

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"strings"

"gopkg.in/src-d/go-git.v4/clients/common"
"gopkg.in/src-d/go-git.v4/formats/packp/advrefs"

"golang.org/x/crypto/ssh"
)
Expand All @@ -24,6 +23,8 @@ var (
ErrUploadPackAnswerFormat = errors.New("git-upload-pack bad answer format")
ErrUnsupportedVCS = errors.New("only git is supported")
ErrUnsupportedRepo = errors.New("only github.com is supported")

nak = []byte("0008NAK\n")
)

// GitUploadPackService holds the service information.
Expand Down Expand Up @@ -134,81 +135,108 @@ func (s *GitUploadPackService) Disconnect() (err error) {
return s.client.Close()
}

// Fetch retrieves the GitUploadPack form the repository.
// You must be connected to the repository before using this method
// (using the ConnectWithAuth() method).
// TODO: fetch should really reuse the info session instead of openning a new
// one
// Fetch returns a packfile for a given upload request. It opens a new
// SSH session on a connected GitUploadPackService, sends the given
// upload request to the server and returns a reader for the received
// packfile. Closing the returned reader will close the SSH session.
func (s *GitUploadPackService) Fetch(r *common.GitUploadPackRequest) (rc io.ReadCloser, err error) {
if !s.connected {
return nil, ErrNotConnected
}

session, err := s.client.NewSession()
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot open SSH session: %s", err)
}

defer func() {
// the session can be closed by the other endpoint,
// therefore we must ignore a close error.
_ = session.Close()
}()

si, err := session.StdinPipe()
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot pipe remote stdin: %s", err)
}

so, err := session.StdoutPipe()
if err != nil {
return nil, err
}

if err := session.Start(s.getCommand()); err != nil {
return nil, err
return nil, fmt.Errorf("cannot pipe remote stdout: %s", err)
}

done := make(chan error)
go func() {
fmt.Fprintln(si, r.String())
err = si.Close()
done <- session.Run(s.getCommand())
}()

// TODO: investigate this *ExitError type (command fails or
// doesn't complete successfully), as it is happenning all
// the time, but everything seems to work fine.
if err := session.Wait(); err != nil {
if _, ok := err.(*ssh.ExitError); !ok {
return nil, err
}
}

// read until the header of the second answer
soBuf := bufio.NewReader(so)
token := "0000"
for {
var line string
line, err = soBuf.ReadString('\n')
if err == io.EOF {
return nil, ErrUploadPackAnswerFormat
}
if line[0:len(token)] == token {
break
}
}

data, err := ioutil.ReadAll(soBuf)
if err := skipAdvRef(so); err != nil {
return nil, fmt.Errorf("skipping advertised-refs: %s", err)
}

// send the upload request
_, err = io.Copy(si, r.Reader())
if err != nil {
return nil, err
return nil, fmt.Errorf("sending upload-req message: %s", err)
}

if err := si.Close(); err != nil {
return nil, fmt.Errorf("closing input: %s", err)
}

buf := bytes.NewBuffer(data)
return ioutil.NopCloser(buf), nil
// TODO support multi_ack mode
// TODO support multi_ack_detailed mode
// TODO support acks for common objects
// TODO build a proper state machine for all these processing options
buf := make([]byte, len(nak))
if _, err := io.ReadFull(so, buf); err != nil {
return nil, fmt.Errorf("looking for NAK: %s", err)
}
if !bytes.Equal(buf, nak) {
return nil, fmt.Errorf("NAK answer not found")
}

return &fetchSession{
Reader: so,
session: session,
done: done,
}, nil
}

func skipAdvRef(so io.Reader) error {
d := advrefs.NewDecoder(so)
ar := advrefs.New()

return d.Decode(ar)
}

type fetchSession struct {
io.Reader
session *ssh.Session
done chan error
}

// Close closes the session and collects the output state of the remote
// SSH command.
//
// If both the remote command and the closing of the session completes
// susccessfully it returns nil.
//
// If the remote command completes unsuccessfully or is interrupted by a
// signal, it returns the corresponding *ExitError.
//
// Otherwise, if clossing the SSH session fails it returns the close
// error. Closing the session when the other has already close it is
// not cosidered an error.
func (f *fetchSession) Close() (err error) {
if err := <-f.done; err != nil {
return err
}

if err := f.session.Close(); err != nil && err != io.EOF {
return err
}

return nil
}

func (s *GitUploadPackService) getCommand() string {
directory := s.endpoint.Path
directory = directory[1:len(directory)]

return fmt.Sprintf("git-upload-pack %s", directory)
return fmt.Sprintf("git-upload-pack '%s'", directory)
}
15 changes: 13 additions & 2 deletions clients/ssh/git_upload_pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,21 @@ func (s *RemoteSuite) TestFetch(c *C) {
req.Want(core.NewHash("e8d3ffab552895c19b9fcf7aa264d277cde33881"))
reader, err := r.Fetch(req)
c.Assert(err, IsNil)
defer func() { c.Assert(reader.Close(), IsNil) }()

b, err := ioutil.ReadAll(reader)
c.Assert(err, IsNil)

//this is failling randomly, should be fixed ASAP
c.Check(len(b), Equals, 85585)
}

func (s *RemoteSuite) TestFetchError(c *C) {
r := NewGitUploadPackService(s.Endpoint)
c.Assert(r.Connect(), IsNil)
defer func() { c.Assert(r.Disconnect(), IsNil) }()

req := &common.GitUploadPackRequest{}
req.Want(core.NewHash("1111111111111111111111111111111111111111"))

_, err := r.Fetch(req)
c.Assert(err, Not(IsNil))
}