Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Commit 585dc4f

Browse files
alcortesmmcuadros
authored andcommitted
ulreq: adds encoder and decoder for upload-request messages (#106)
* ulreq: adds encoder and decoder for upload-request messages * ulreq: stop using _test suffix for testing package names (@mcuadros comment)
1 parent 1afef7e commit 585dc4f

File tree

8 files changed

+1493
-34
lines changed

8 files changed

+1493
-34
lines changed

clients/ssh/git_upload_pack.go

Lines changed: 106 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"gopkg.in/src-d/go-git.v4/clients/common"
1212
"gopkg.in/src-d/go-git.v4/formats/packp/advrefs"
13+
"gopkg.in/src-d/go-git.v4/formats/packp/pktline"
14+
"gopkg.in/src-d/go-git.v4/formats/packp/ulreq"
1315

1416
"golang.org/x/crypto/ssh"
1517
)
@@ -24,7 +26,8 @@ var (
2426
ErrUnsupportedVCS = errors.New("only git is supported")
2527
ErrUnsupportedRepo = errors.New("only github.com is supported")
2628

27-
nak = []byte("0008NAK\n")
29+
nak = []byte("NAK")
30+
eol = []byte("\n")
2831
)
2932

3033
// GitUploadPackService holds the service information.
@@ -139,75 +142,145 @@ func (s *GitUploadPackService) Disconnect() (err error) {
139142
// SSH session on a connected GitUploadPackService, sends the given
140143
// upload request to the server and returns a reader for the received
141144
// packfile. Closing the returned reader will close the SSH session.
142-
func (s *GitUploadPackService) Fetch(r *common.GitUploadPackRequest) (rc io.ReadCloser, err error) {
145+
func (s *GitUploadPackService) Fetch(req *common.GitUploadPackRequest) (rc io.ReadCloser, err error) {
143146
if !s.connected {
144147
return nil, ErrNotConnected
145148
}
146149

147-
session, err := s.client.NewSession()
150+
session, i, o, done, err := openSSHSession(s.client, s.getCommand())
148151
if err != nil {
149152
return nil, fmt.Errorf("cannot open SSH session: %s", err)
150153
}
151154

152-
si, err := session.StdinPipe()
155+
if err := talkPackProtocol(i, o, req); err != nil {
156+
return nil, err
157+
}
158+
159+
return &fetchSession{
160+
Reader: o,
161+
session: session,
162+
done: done,
163+
}, nil
164+
}
165+
166+
func openSSHSession(c *ssh.Client, cmd string) (
167+
*ssh.Session, io.WriteCloser, io.Reader, <-chan error, error) {
168+
169+
session, err := c.NewSession()
153170
if err != nil {
154-
return nil, fmt.Errorf("cannot pipe remote stdin: %s", err)
171+
return nil, nil, nil, nil, fmt.Errorf("cannot open SSH session: %s", err)
155172
}
156173

157-
so, err := session.StdoutPipe()
174+
i, err := session.StdinPipe()
158175
if err != nil {
159-
return nil, fmt.Errorf("cannot pipe remote stdout: %s", err)
176+
return nil, nil, nil, nil, fmt.Errorf("cannot pipe remote stdin: %s", err)
177+
}
178+
179+
o, err := session.StdoutPipe()
180+
if err != nil {
181+
return nil, nil, nil, nil, fmt.Errorf("cannot pipe remote stdout: %s", err)
160182
}
161183

162184
done := make(chan error)
163185
go func() {
164-
done <- session.Run(s.getCommand())
186+
done <- session.Run(cmd)
165187
}()
166188

167-
if err := skipAdvRef(so); err != nil {
168-
return nil, fmt.Errorf("skipping advertised-refs: %s", err)
189+
return session, i, o, done, nil
190+
}
191+
192+
// TODO support multi_ack mode
193+
// TODO support multi_ack_detailed mode
194+
// TODO support acks for common objects
195+
// TODO build a proper state machine for all these processing options
196+
func talkPackProtocol(w io.WriteCloser, r io.Reader,
197+
req *common.GitUploadPackRequest) error {
198+
199+
if err := skipAdvRef(r); err != nil {
200+
return fmt.Errorf("skipping advertised-refs: %s", err)
169201
}
170202

171-
// send the upload request
172-
_, err = io.Copy(si, r.Reader())
173-
if err != nil {
174-
return nil, fmt.Errorf("sending upload-req message: %s", err)
203+
if err := sendUlReq(w, req); err != nil {
204+
return fmt.Errorf("sending upload-req message: %s", err)
175205
}
176206

177-
if err := si.Close(); err != nil {
178-
return nil, fmt.Errorf("closing input: %s", err)
207+
if err := sendHaves(w, req); err != nil {
208+
return fmt.Errorf("sending haves message: %s", err)
179209
}
180210

181-
// TODO support multi_ack mode
182-
// TODO support multi_ack_detailed mode
183-
// TODO support acks for common objects
184-
// TODO build a proper state machine for all these processing options
185-
buf := make([]byte, len(nak))
186-
if _, err := io.ReadFull(so, buf); err != nil {
187-
return nil, fmt.Errorf("looking for NAK: %s", err)
211+
if err := sendDone(w); err != nil {
212+
return fmt.Errorf("sending done message: %s", err)
188213
}
189-
if !bytes.Equal(buf, nak) {
190-
return nil, fmt.Errorf("NAK answer not found")
214+
215+
if err := w.Close(); err != nil {
216+
return fmt.Errorf("closing input: %s", err)
191217
}
192218

193-
return &fetchSession{
194-
Reader: so,
195-
session: session,
196-
done: done,
197-
}, nil
219+
if err := readNAK(r); err != nil {
220+
return fmt.Errorf("reading NAK: %s", err)
221+
}
222+
223+
return nil
198224
}
199225

200-
func skipAdvRef(so io.Reader) error {
201-
d := advrefs.NewDecoder(so)
226+
func skipAdvRef(r io.Reader) error {
227+
d := advrefs.NewDecoder(r)
202228
ar := advrefs.New()
203229

204230
return d.Decode(ar)
205231
}
206232

233+
func sendUlReq(w io.Writer, req *common.GitUploadPackRequest) error {
234+
ur := ulreq.New()
235+
ur.Wants = req.Wants
236+
ur.Depth = ulreq.DepthCommits(req.Depth)
237+
e := ulreq.NewEncoder(w)
238+
239+
return e.Encode(ur)
240+
}
241+
242+
func sendHaves(w io.Writer, req *common.GitUploadPackRequest) error {
243+
e := pktline.NewEncoder(w)
244+
for _, have := range req.Haves {
245+
if err := e.Encodef("have %s\n", have); err != nil {
246+
return fmt.Errorf("sending haves for %q: err ", have, err)
247+
}
248+
}
249+
250+
if len(req.Haves) != 0 {
251+
if err := e.Flush(); err != nil {
252+
return fmt.Errorf("sending flush-pkt after haves: %s", err)
253+
}
254+
}
255+
256+
return nil
257+
}
258+
259+
func sendDone(w io.Writer) error {
260+
e := pktline.NewEncoder(w)
261+
262+
return e.Encodef("done\n")
263+
}
264+
265+
func readNAK(r io.Reader) error {
266+
s := pktline.NewScanner(r)
267+
if !s.Scan() {
268+
return s.Err()
269+
}
270+
271+
b := s.Bytes()
272+
b = bytes.TrimSuffix(b, eol)
273+
if !bytes.Equal(b, nak) {
274+
return fmt.Errorf("expecting NAK, found %q instead", string(b))
275+
}
276+
277+
return nil
278+
}
279+
207280
type fetchSession struct {
208281
io.Reader
209282
session *ssh.Session
210-
done chan error
283+
done <-chan error
211284
}
212285

213286
// Close closes the session and collects the output state of the remote

clients/ssh/git_upload_pack_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ func (s *RemoteSuite) TestFetchError(c *C) {
136136
req := &common.GitUploadPackRequest{}
137137
req.Want(core.NewHash("1111111111111111111111111111111111111111"))
138138

139-
_, err := r.Fetch(req)
139+
reader, err := r.Fetch(req)
140+
c.Assert(err, IsNil)
141+
142+
err = reader.Close()
140143
c.Assert(err, Not(IsNil))
141144
}

0 commit comments

Comments
 (0)