Skip to content

Commit cd77615

Browse files
authored
Add support for zstd compression (#1487)
* Add support for zstd compression Squashed commits: [6563fc8] Run ./hack/update-codegen.sh [c86a013] Fix linting issues [353dd69] Remove redundant break statements [a06b4cc] Fix some comments [fb0678f] Expose compression algorithms enum [3321d0c] Update mediaType according to compression for tarball layers (cherry picked from commit 56d4df5) [5124d2c] Fix unit tests (cherry picked from commit 7ce08d1) [2958f30] Optimize compression detection (cherry picked from commit 0a3437f) [c211a61] Add support for zstd-compressedd layers (cherry picked from commit d8fb0f3) [0b9a996] Add zstd compression utilities (cherry picked from commit c645201) [7dd71ab] Add layer types with zstd compression (cherry picked from commit 328a14b) Signed-off-by: Lavrenti Frobeen <lavrenti@northflank.com> * Remove OCIRestrictedLayerZStd from MediaType enum * Remove unused checkCompression method * Add missing copyright headers * Fix goimport lint errors * Run "go mod tidy" in cmd/krane * Rename gw to zw in zstd.go * Fix typos and clean up comments * Create public compression package for Compression enum; disable "none" compression for tarball layer * Fix WithCompression method * Print warning if mediaType does not match selected compression * Add unit tests for new zstd compression implementation * Run go mod tidy in pkg/authn/k8schain and pkg/authn/kubernetes
1 parent c412593 commit cd77615

16 files changed

Lines changed: 1386 additions & 90 deletions

File tree

cmd/krane/go.sum

Lines changed: 790 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/docker/distribution v2.8.1+incompatible
99
github.com/docker/docker v20.10.20+incompatible
1010
github.com/google/go-cmp v0.5.9
11+
github.com/klauspost/compress v1.15.11
1112
github.com/mitchellh/go-homedir v1.1.0
1213
github.com/opencontainers/go-digest v1.0.0
1314
github.com/opencontainers/image-spec v1.1.0-rc2
@@ -28,7 +29,6 @@ require (
2829
github.com/gogo/protobuf v1.3.2 // indirect
2930
github.com/golang/protobuf v1.5.2 // indirect
3031
github.com/inconshreveable/mousetrap v1.0.1 // indirect
31-
github.com/klauspost/compress v1.15.11 // indirect
3232
github.com/moby/term v0.0.0-20210610120745-9d4ed1856297 // indirect
3333
github.com/morikuni/aec v1.0.0 // indirect
3434
github.com/pkg/errors v0.9.1 // indirect
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2022 Google LLC All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package compression
16+
17+
import (
18+
"bufio"
19+
"bytes"
20+
"io"
21+
22+
"github.com/google/go-containerregistry/internal/gzip"
23+
"github.com/google/go-containerregistry/internal/zstd"
24+
"github.com/google/go-containerregistry/pkg/compression"
25+
)
26+
27+
type Opener = func() (io.ReadCloser, error)
28+
29+
// GetCompression detects whether an Opener is compressed and which algorithm is used.
30+
func GetCompression(opener Opener) (compression.Compression, error) {
31+
rc, err := opener()
32+
if err != nil {
33+
return compression.None, err
34+
}
35+
defer rc.Close()
36+
37+
cp, _, err := PeekCompression(rc)
38+
if err != nil {
39+
return compression.None, err
40+
}
41+
42+
return cp, nil
43+
}
44+
45+
// PeekCompression detects whether the input stream is compressed and which algorithm is used.
46+
//
47+
// If r implements Peek, we will use that directly, otherwise a small number
48+
// of bytes are buffered to Peek at the gzip/zstd header, and the returned
49+
// PeekReader can be used as a replacement for the consumed input io.Reader.
50+
func PeekCompression(r io.Reader) (compression.Compression, PeekReader, error) {
51+
pr := intoPeekReader(r)
52+
53+
if isGZip, _, err := checkHeader(pr, gzip.MagicHeader); err != nil {
54+
return compression.None, pr, err
55+
} else if isGZip {
56+
return compression.GZip, pr, nil
57+
}
58+
59+
if isZStd, _, err := checkHeader(pr, zstd.MagicHeader); err != nil {
60+
return compression.None, pr, err
61+
} else if isZStd {
62+
return compression.ZStd, pr, nil
63+
}
64+
65+
return compression.None, pr, nil
66+
}
67+
68+
// PeekReader is an io.Reader that also implements Peek a la bufio.Reader.
69+
type PeekReader interface {
70+
io.Reader
71+
Peek(n int) ([]byte, error)
72+
}
73+
74+
// IntoPeekReader creates a PeekReader from an io.Reader.
75+
// If the reader already has a Peek method, it will just return the passed reader.
76+
func intoPeekReader(r io.Reader) PeekReader {
77+
if p, ok := r.(PeekReader); ok {
78+
return p
79+
} else {
80+
return bufio.NewReader(r)
81+
}
82+
}
83+
84+
// CheckHeader checks whether the first bytes from a PeekReader match an expected header
85+
func checkHeader(pr PeekReader, expectedHeader []byte) (bool, PeekReader, error) {
86+
header, err := pr.Peek(len(expectedHeader))
87+
if err != nil {
88+
// https://github.com/google/go-containerregistry/issues/367
89+
if err == io.EOF {
90+
return false, pr, nil
91+
}
92+
return false, pr, err
93+
}
94+
return bytes.Equal(header, expectedHeader), pr, nil
95+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2020 Google LLC All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package compression
16+
17+
import (
18+
"bytes"
19+
"io"
20+
"testing"
21+
22+
"github.com/google/go-containerregistry/internal/and"
23+
"github.com/google/go-containerregistry/internal/gzip"
24+
"github.com/google/go-containerregistry/internal/zstd"
25+
)
26+
27+
type Compressor = func(rc io.ReadCloser) io.ReadCloser
28+
type Decompressor = func(rc io.ReadCloser) (io.ReadCloser, error)
29+
30+
func testPeekCompression(t *testing.T,
31+
compressionExpected string,
32+
compress Compressor,
33+
decompress Decompressor,
34+
) {
35+
content := "This is the input string."
36+
contentBuf := bytes.NewBufferString(content)
37+
38+
compressed := compress(io.NopCloser(contentBuf))
39+
compressionDetected, pr, err := PeekCompression(compressed)
40+
41+
if err != nil {
42+
t.Error("PeekCompression() =", err)
43+
}
44+
45+
if got := string(compressionDetected); got != compressionExpected {
46+
t.Errorf("PeekCompression(); got %q, content %q", got, compressionExpected)
47+
}
48+
49+
decompressed, err := decompress(withCloser(pr, compressed))
50+
51+
b, err := io.ReadAll(decompressed)
52+
if err != nil {
53+
t.Error("ReadAll() =", err)
54+
}
55+
56+
if got := string(b); got != content {
57+
t.Errorf("ReadAll(); got %q, content %q", got, content)
58+
}
59+
}
60+
61+
func TestPeekCompression(t *testing.T) {
62+
testPeekCompression(t, "gzip", gzip.ReadCloser, gzip.UnzipReadCloser)
63+
testPeekCompression(t, "zstd", zstd.ReadCloser, zstd.UnzipReadCloser)
64+
65+
nopCompress := func(rc io.ReadCloser) io.ReadCloser { return rc }
66+
nopDecompress := func(rc io.ReadCloser) (io.ReadCloser, error) { return rc, nil }
67+
68+
testPeekCompression(t, "none", nopCompress, nopDecompress)
69+
}
70+
71+
func withCloser(pr PeekReader, rc io.ReadCloser) io.ReadCloser {
72+
return &and.ReadCloser{
73+
Reader: pr,
74+
CloseFunc: rc.Close,
75+
}
76+
}

internal/gzip/zip.go

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/google/go-containerregistry/internal/and"
2525
)
2626

27-
var gzipMagicHeader = []byte{'\x1f', '\x8b'}
27+
var MagicHeader = []byte{'\x1f', '\x8b'}
2828

2929
// ReadCloser reads uncompressed input data from the io.ReadCloser and
3030
// returns an io.ReadCloser from which compressed data may be read.
@@ -84,7 +84,7 @@ func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
8484
}
8585

8686
// UnzipReadCloser reads compressed input data from the io.ReadCloser and
87-
// returns an io.ReadCloser from which uncompessed data may be read.
87+
// returns an io.ReadCloser from which uncompressed data may be read.
8888
func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
8989
gr, err := gzip.NewReader(r)
9090
if err != nil {
@@ -113,34 +113,5 @@ func Is(r io.Reader) (bool, error) {
113113
if err != nil {
114114
return false, err
115115
}
116-
return bytes.Equal(magicHeader, gzipMagicHeader), nil
117-
}
118-
119-
// PeekReader is an io.Reader that also implements Peek a la bufio.Reader.
120-
type PeekReader interface {
121-
io.Reader
122-
Peek(n int) ([]byte, error)
123-
}
124-
125-
// Peek detects whether the input stream is gzip compressed.
126-
//
127-
// If r implements Peek, we will use that directly, otherwise a small number
128-
// of bytes are buffered to Peek at the gzip header, and the returned
129-
// PeekReader can be used as a replacement for the consumed input io.Reader.
130-
func Peek(r io.Reader) (bool, PeekReader, error) {
131-
var pr PeekReader
132-
if p, ok := r.(PeekReader); ok {
133-
pr = p
134-
} else {
135-
pr = bufio.NewReader(r)
136-
}
137-
header, err := pr.Peek(2)
138-
if err != nil {
139-
// https://github.com/google/go-containerregistry/issues/367
140-
if err == io.EOF {
141-
return false, pr, nil
142-
}
143-
return false, pr, err
144-
}
145-
return bytes.Equal(header, gzipMagicHeader), pr, nil
116+
return bytes.Equal(magicHeader, MagicHeader), nil
146117
}

internal/zstd/zstd.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2022 Google LLC All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package zstd
16+
17+
import (
18+
"bufio"
19+
"bytes"
20+
"io"
21+
22+
"github.com/google/go-containerregistry/internal/and"
23+
"github.com/klauspost/compress/zstd"
24+
)
25+
26+
var MagicHeader = []byte{'\x28', '\xb5', '\x2f', '\xfd'}
27+
28+
// ReadCloser reads uncompressed input data from the io.ReadCloser and
29+
// returns an io.ReadCloser from which compressed data may be read.
30+
// This uses zstd level 1 for the compression.
31+
func ReadCloser(r io.ReadCloser) io.ReadCloser {
32+
return ReadCloserLevel(r, 1)
33+
}
34+
35+
// ReadCloserLevel reads uncompressed input data from the io.ReadCloser and
36+
// returns an io.ReadCloser from which compressed data may be read.
37+
func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
38+
pr, pw := io.Pipe()
39+
40+
// For highly compressible layers, zstd.Writer will output a very small
41+
// number of bytes per Write(). This is normally fine, but when pushing
42+
// to a registry, we want to ensure that we're taking full advantage of
43+
// the available bandwidth instead of sending tons of tiny writes over
44+
// the wire.
45+
// 64K ought to be small enough for anybody.
46+
bw := bufio.NewWriterSize(pw, 2<<16)
47+
48+
// Returns err so we can pw.CloseWithError(err)
49+
go func() error {
50+
// TODO(go1.14): Just defer {pw,zw,r}.Close like you'd expect.
51+
// Context: https://golang.org/issue/24283
52+
zw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)))
53+
if err != nil {
54+
return pw.CloseWithError(err)
55+
}
56+
57+
if _, err := io.Copy(zw, r); err != nil {
58+
defer r.Close()
59+
defer zw.Close()
60+
return pw.CloseWithError(err)
61+
}
62+
63+
// Close zstd writer to Flush it and write zstd trailers.
64+
if err := zw.Close(); err != nil {
65+
return pw.CloseWithError(err)
66+
}
67+
68+
// Flush bufio writer to ensure we write out everything.
69+
if err := bw.Flush(); err != nil {
70+
return pw.CloseWithError(err)
71+
}
72+
73+
// We don't really care if these fail.
74+
defer pw.Close()
75+
defer r.Close()
76+
77+
return nil
78+
}()
79+
80+
return pr
81+
}
82+
83+
// UnzipReadCloser reads compressed input data from the io.ReadCloser and
84+
// returns an io.ReadCloser from which uncompressed data may be read.
85+
func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
86+
gr, err := zstd.NewReader(r)
87+
if err != nil {
88+
return nil, err
89+
}
90+
return &and.ReadCloser{
91+
Reader: gr,
92+
CloseFunc: func() error {
93+
// If the unzip fails, then this seems to return the same
94+
// error as the read. We don't want this to interfere with
95+
// us closing the main ReadCloser, since this could leave
96+
// an open file descriptor (fails on Windows).
97+
gr.Close()
98+
return r.Close()
99+
},
100+
}, nil
101+
}
102+
103+
// Is detects whether the input stream is compressed.
104+
func Is(r io.Reader) (bool, error) {
105+
magicHeader := make([]byte, 4)
106+
n, err := r.Read(magicHeader)
107+
if n == 0 && err == io.EOF {
108+
return false, nil
109+
}
110+
if err != nil {
111+
return false, err
112+
}
113+
return bytes.Equal(magicHeader, MagicHeader), nil
114+
}

0 commit comments

Comments
 (0)