Skip to content

Commit 29e2dea

Browse files
lengrongfudcantah
authored andcommitted
fix pusher concurrent close channel
Signed-off-by: rongfu.leng <rongfu.leng@daocloud.io> (cherry picked from commit 63a7d8a) Signed-off-by: Danny Canter <danny@dcantah.dev>
1 parent 9a194f7 commit 29e2dea

File tree

2 files changed

+11
-14
lines changed

2 files changed

+11
-14
lines changed

remotes/docker/pusher.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net/http"
2525
"net/url"
2626
"strings"
27+
"sync"
2728
"time"
2829

2930
"github.com/containerd/containerd/content"
@@ -317,9 +318,10 @@ type pushWriter struct {
317318

318319
pipe *io.PipeWriter
319320

320-
pipeC chan *io.PipeWriter
321-
respC chan *http.Response
322-
errC chan error
321+
pipeC chan *io.PipeWriter
322+
respC chan *http.Response
323+
closeOnce sync.Once
324+
errC chan error
323325

324326
isManifest bool
325327

@@ -395,14 +397,9 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
395397
func (pw *pushWriter) Close() error {
396398
// Ensure pipeC is closed but handle `Close()` being
397399
// called multiple times without panicking
398-
select {
399-
case _, ok := <-pw.pipeC:
400-
if ok {
401-
close(pw.pipeC)
402-
}
403-
default:
400+
pw.closeOnce.Do(func() {
404401
close(pw.pipeC)
405-
}
402+
})
406403
if pw.pipe != nil {
407404
status, err := pw.tracker.GetStatus(pw.ref)
408405
if err == nil && !status.Committed {

remotes/docker/pusher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func Test_dockerPusher_push(t *testing.T) {
293293
dp dockerPusher
294294
dockerBaseObject string
295295
args args
296-
checkerFunc func(writer pushWriter) bool
296+
checkerFunc func(writer *pushWriter) bool
297297
wantErr error
298298
}{
299299
{
@@ -306,7 +306,7 @@ func Test_dockerPusher_push(t *testing.T) {
306306
ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()),
307307
unavailableOnFail: false,
308308
},
309-
checkerFunc: func(writer pushWriter) bool {
309+
checkerFunc: func(writer *pushWriter) bool {
310310
select {
311311
case resp := <-writer.respC:
312312
// 201 should be the response code when uploading a new manifest
@@ -340,7 +340,7 @@ func Test_dockerPusher_push(t *testing.T) {
340340
ref: fmt.Sprintf("layer-%s", layerContentDigest.String()),
341341
unavailableOnFail: false,
342342
},
343-
checkerFunc: func(writer pushWriter) bool {
343+
checkerFunc: func(writer *pushWriter) bool {
344344
select {
345345
case resp := <-writer.respC:
346346
// 201 should be the response code when uploading a new blob
@@ -379,7 +379,7 @@ func Test_dockerPusher_push(t *testing.T) {
379379
}
380380

381381
// test whether a proper response has been received after the push operation
382-
assert.True(t, test.checkerFunc(*pw))
382+
assert.True(t, test.checkerFunc(pw))
383383

384384
})
385385
}

0 commit comments

Comments
 (0)