Skip to content

Commit c03560f

Browse files
authored
zstd: Add ResetWithOptions to encoder/decoder (#1122)
## New API Methods ``` func (*Encoder) ResetWithOptions(w io.Writer, opts ...EOption) error func (*Decoder) ResetWithOptions(r io.Reader, opts ...DOption) error ``` New Options // Encoder - clears dictionary `WithEncoderDictDelete() EOption` // Decoder - removes dicts by ID; no args clears all `WithDecoderDictDelete(ids ...uint32) DOption` Option Reset Compatibility Can be changed with ResetWithOptions: - Encoder: WithEncoderCRC, WithEncoderPadding, WithZeroFrames, WithAllLitEntropyCompression, WithNoEntropyCompression, WithSingleSegment, WithEncoderDict, WithEncoderDictRaw, WithEncoderDictDelete - Decoder: WithDecoderMaxMemory, WithDecoderMaxWindow, WithDecoderDicts, WithDecoderDictRaw, WithDecoderDictDelete, WithDecodeAllCapLimit, IgnoreChecksum Cannot be changed with ResetWithOptions: - Encoder: WithEncoderConcurrency, WithWindowSize, WithEncoderLevel, WithLowerEncoderMem - Decoder: WithDecoderLowmem, WithDecoderConcurrency, WithDecodeBuffersBelow
1 parent 0874ab8 commit c03560f

6 files changed

Lines changed: 331 additions & 19 deletions

File tree

zstd/decoder.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ type Decoder struct {
3939

4040
frame *frameDec
4141

42-
// Custom dictionaries.
43-
dicts map[uint32]*dict
44-
4542
// streamWg is the waitgroup for all streams
4643
streamWg sync.WaitGroup
4744
}
@@ -101,12 +98,10 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
10198
d.current.err = ErrDecoderNilInput
10299
}
103100

104-
// Transfer option dicts.
105-
d.dicts = make(map[uint32]*dict, len(d.o.dicts))
106-
for _, dc := range d.o.dicts {
107-
d.dicts[dc.id] = dc
101+
// Initialize dict map if needed.
102+
if d.o.dicts == nil {
103+
d.o.dicts = make(map[uint32]*dict)
108104
}
109-
d.o.dicts = nil
110105

111106
// Create decoders
112107
d.decoders = make(chan *blockDec, d.o.concurrent)
@@ -238,6 +233,21 @@ func (d *Decoder) Reset(r io.Reader) error {
238233
return nil
239234
}
240235

236+
// ResetWithOptions will reset the decoder and apply the given options
237+
// for the next stream or DecodeAll operation.
238+
// Options are applied on top of the existing options.
239+
// Some options cannot be changed on reset and will return an error.
240+
func (d *Decoder) ResetWithOptions(r io.Reader, opts ...DOption) error {
241+
d.o.resetOpt = true
242+
defer func() { d.o.resetOpt = false }()
243+
for _, o := range opts {
244+
if err := o(&d.o); err != nil {
245+
return err
246+
}
247+
}
248+
return d.Reset(r)
249+
}
250+
241251
// drainOutput will drain the output until errEndOfStream is sent.
242252
func (d *Decoder) drainOutput() {
243253
if d.current.cancel != nil {
@@ -930,7 +940,7 @@ decodeStream:
930940
}
931941

932942
func (d *Decoder) setDict(frame *frameDec) (err error) {
933-
dict, ok := d.dicts[frame.DictionaryID]
943+
dict, ok := d.o.dicts[frame.DictionaryID]
934944
if ok {
935945
if debugDecoder {
936946
println("setting dict", frame.DictionaryID)

zstd/decoder_options.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ type decoderOptions struct {
2020
concurrent int
2121
maxDecodedSize uint64
2222
maxWindowSize uint64
23-
dicts []*dict
23+
dicts map[uint32]*dict
2424
ignoreChecksum bool
2525
limitToCap bool
2626
decodeBufsBelow int
27+
resetOpt bool
2728
}
2829

2930
func (o *decoderOptions) setDefault() {
@@ -42,8 +43,15 @@ func (o *decoderOptions) setDefault() {
4243

4344
// WithDecoderLowmem will set whether to use a lower amount of memory,
4445
// but possibly have to allocate more while running.
46+
// Cannot be changed with ResetWithOptions.
4547
func WithDecoderLowmem(b bool) DOption {
46-
return func(o *decoderOptions) error { o.lowMem = b; return nil }
48+
return func(o *decoderOptions) error {
49+
if o.resetOpt && b != o.lowMem {
50+
return errors.New("WithDecoderLowmem cannot be changed on Reset")
51+
}
52+
o.lowMem = b
53+
return nil
54+
}
4755
}
4856

4957
// WithDecoderConcurrency sets the number of created decoders.
@@ -56,16 +64,20 @@ func WithDecoderLowmem(b bool) DOption {
5664
// The value supplied must be at least 0.
5765
// When a value of 0 is provided GOMAXPROCS will be used.
5866
// By default this will be set to 4 or GOMAXPROCS, whatever is lower.
67+
// Cannot be changed with ResetWithOptions.
5968
func WithDecoderConcurrency(n int) DOption {
6069
return func(o *decoderOptions) error {
6170
if n < 0 {
6271
return errors.New("concurrency must be at least 0")
6372
}
73+
newVal := n
6474
if n == 0 {
65-
o.concurrent = runtime.GOMAXPROCS(0)
66-
} else {
67-
o.concurrent = n
75+
newVal = runtime.GOMAXPROCS(0)
6876
}
77+
if o.resetOpt && newVal != o.concurrent {
78+
return errors.New("WithDecoderConcurrency cannot be changed on Reset")
79+
}
80+
o.concurrent = newVal
6981
return nil
7082
}
7183
}
@@ -74,6 +86,7 @@ func WithDecoderConcurrency(n int) DOption {
7486
// non-streaming operations or maximum window size for streaming operations.
7587
// This can be used to control memory usage of potentially hostile content.
7688
// Maximum is 1 << 63 bytes. Default is 64GiB.
89+
// Can be changed with ResetWithOptions.
7790
func WithDecoderMaxMemory(n uint64) DOption {
7891
return func(o *decoderOptions) error {
7992
if n == 0 {
@@ -93,29 +106,37 @@ func WithDecoderMaxMemory(n uint64) DOption {
93106
// "zstd --train" from the Zstandard reference implementation.
94107
//
95108
// If several dictionaries with the same ID are provided, the last one will be used.
109+
// Can be changed with ResetWithOptions.
96110
//
97111
// [dictionary format]: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#dictionary-format
98112
func WithDecoderDicts(dicts ...[]byte) DOption {
99113
return func(o *decoderOptions) error {
114+
if o.dicts == nil {
115+
o.dicts = make(map[uint32]*dict)
116+
}
100117
for _, b := range dicts {
101118
d, err := loadDict(b)
102119
if err != nil {
103120
return err
104121
}
105-
o.dicts = append(o.dicts, d)
122+
o.dicts[d.id] = d
106123
}
107124
return nil
108125
}
109126
}
110127

111128
// WithDecoderDictRaw registers a dictionary that may be used by the decoder.
112129
// The slice content can be arbitrary data.
130+
// Can be changed with ResetWithOptions.
113131
func WithDecoderDictRaw(id uint32, content []byte) DOption {
114132
return func(o *decoderOptions) error {
115133
if bits.UintSize > 32 && uint(len(content)) > dictMaxLength {
116134
return fmt.Errorf("dictionary of size %d > 2GiB too large", len(content))
117135
}
118-
o.dicts = append(o.dicts, &dict{id: id, content: content, offsets: [3]int{1, 4, 8}})
136+
if o.dicts == nil {
137+
o.dicts = make(map[uint32]*dict)
138+
}
139+
o.dicts[id] = &dict{id: id, content: content, offsets: [3]int{1, 4, 8}}
119140
return nil
120141
}
121142
}
@@ -125,6 +146,7 @@ func WithDecoderDictRaw(id uint32, content []byte) DOption {
125146
// The Decoder will likely allocate more memory based on the WithDecoderLowmem setting.
126147
// If WithDecoderMaxMemory is set to a lower value, that will be used.
127148
// Default is 512MB, Maximum is ~3.75 TB as per zstandard spec.
149+
// Can be changed with ResetWithOptions.
128150
func WithDecoderMaxWindow(size uint64) DOption {
129151
return func(o *decoderOptions) error {
130152
if size < MinWindowSize {
@@ -142,6 +164,7 @@ func WithDecoderMaxWindow(size uint64) DOption {
142164
// or any size set in WithDecoderMaxMemory.
143165
// This can be used to limit decoding to a specific maximum output size.
144166
// Disabled by default.
167+
// Can be changed with ResetWithOptions.
145168
func WithDecodeAllCapLimit(b bool) DOption {
146169
return func(o *decoderOptions) error {
147170
o.limitToCap = b
@@ -154,17 +177,37 @@ func WithDecodeAllCapLimit(b bool) DOption {
154177
// This typically uses less allocations but will have the full decompressed object in memory.
155178
// Note that DecodeAllCapLimit will disable this, as well as giving a size of 0 or less.
156179
// Default is 128KiB.
180+
// Cannot be changed with ResetWithOptions.
157181
func WithDecodeBuffersBelow(size int) DOption {
158182
return func(o *decoderOptions) error {
183+
if o.resetOpt && size != o.decodeBufsBelow {
184+
return errors.New("WithDecodeBuffersBelow cannot be changed on Reset")
185+
}
159186
o.decodeBufsBelow = size
160187
return nil
161188
}
162189
}
163190

164191
// IgnoreChecksum allows to forcibly ignore checksum checking.
192+
// Can be changed with ResetWithOptions.
165193
func IgnoreChecksum(b bool) DOption {
166194
return func(o *decoderOptions) error {
167195
o.ignoreChecksum = b
168196
return nil
169197
}
170198
}
199+
200+
// WithDecoderDictDelete removes dictionaries by ID.
201+
// If no ids are passed, all dictionaries are deleted.
202+
// Should be used with ResetWithOptions.
203+
func WithDecoderDictDelete(ids ...uint32) DOption {
204+
return func(o *decoderOptions) error {
205+
if len(ids) == 0 {
206+
clear(o.dicts)
207+
}
208+
for _, id := range ids {
209+
delete(o.dicts, id)
210+
}
211+
return nil
212+
}
213+
}

zstd/decoder_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,3 +2213,129 @@ func TestWithDecodeAllCapLimit(t *testing.T) {
22132213
})
22142214
}
22152215
}
2216+
2217+
func TestDecoderResetWithOptions(t *testing.T) {
2218+
dec, err := NewReader(nil, WithDecoderConcurrency(1), WithDecoderLowmem(true))
2219+
if err != nil {
2220+
t.Fatal(err)
2221+
}
2222+
defer dec.Close()
2223+
2224+
// Test changing safe options
2225+
t.Run("change-maxmemory", func(t *testing.T) {
2226+
err := dec.ResetWithOptions(nil, WithDecoderMaxMemory(1<<20))
2227+
if err != nil {
2228+
t.Errorf("ResetWithOptions should allow changing maxMemory: %v", err)
2229+
}
2230+
})
2231+
2232+
t.Run("change-maxwindow", func(t *testing.T) {
2233+
err := dec.ResetWithOptions(nil, WithDecoderMaxWindow(1<<20))
2234+
if err != nil {
2235+
t.Errorf("ResetWithOptions should allow changing maxWindow: %v", err)
2236+
}
2237+
})
2238+
2239+
t.Run("change-ignorechecksum", func(t *testing.T) {
2240+
err := dec.ResetWithOptions(nil, IgnoreChecksum(true))
2241+
if err != nil {
2242+
t.Errorf("ResetWithOptions should allow changing ignoreChecksum: %v", err)
2243+
}
2244+
})
2245+
2246+
// Test error when changing unsafe options
2247+
t.Run("change-concurrency-error", func(t *testing.T) {
2248+
err := dec.ResetWithOptions(nil, WithDecoderConcurrency(4))
2249+
if err == nil {
2250+
t.Error("ResetWithOptions should error when changing concurrency")
2251+
}
2252+
})
2253+
2254+
t.Run("change-lowmem-error", func(t *testing.T) {
2255+
err := dec.ResetWithOptions(nil, WithDecoderLowmem(false))
2256+
if err == nil {
2257+
t.Error("ResetWithOptions should error when changing lowmem")
2258+
}
2259+
})
2260+
2261+
// Test same value is allowed
2262+
t.Run("same-concurrency-ok", func(t *testing.T) {
2263+
err := dec.ResetWithOptions(nil, WithDecoderConcurrency(1))
2264+
if err != nil {
2265+
t.Errorf("ResetWithOptions should allow same concurrency: %v", err)
2266+
}
2267+
})
2268+
}
2269+
2270+
func TestDecoderDictDelete(t *testing.T) {
2271+
dictContent := []byte("test dictionary content for decompression testing purposes")
2272+
2273+
dec, err := NewReader(nil, WithDecoderDictRaw(100, dictContent), WithDecoderDictRaw(200, dictContent))
2274+
if err != nil {
2275+
t.Fatal(err)
2276+
}
2277+
defer dec.Close()
2278+
2279+
if len(dec.o.dicts) != 2 {
2280+
t.Fatalf("expected 2 dicts, got %d", len(dec.o.dicts))
2281+
}
2282+
2283+
// Delete specific dict
2284+
err = dec.ResetWithOptions(nil, WithDecoderDictDelete(100))
2285+
if err != nil {
2286+
t.Fatal(err)
2287+
}
2288+
if len(dec.o.dicts) != 1 {
2289+
t.Errorf("expected 1 dict after delete, got %d", len(dec.o.dicts))
2290+
}
2291+
if _, ok := dec.o.dicts[200]; !ok {
2292+
t.Error("dict 200 should still exist")
2293+
}
2294+
2295+
// Add another dict
2296+
err = dec.ResetWithOptions(nil, WithDecoderDictRaw(300, dictContent))
2297+
if err != nil {
2298+
t.Fatal(err)
2299+
}
2300+
if len(dec.o.dicts) != 2 {
2301+
t.Errorf("expected 2 dicts after add, got %d", len(dec.o.dicts))
2302+
}
2303+
2304+
// Delete all dicts with no arguments
2305+
err = dec.ResetWithOptions(nil, WithDecoderDictDelete())
2306+
if err != nil {
2307+
t.Fatal(err)
2308+
}
2309+
if len(dec.o.dicts) != 0 {
2310+
t.Errorf("expected 0 dicts after delete all, got %d", len(dec.o.dicts))
2311+
}
2312+
}
2313+
2314+
func TestDecoderDictDeleteMultiple(t *testing.T) {
2315+
dictContent := []byte("test dictionary content")
2316+
2317+
dec, err := NewReader(nil,
2318+
WithDecoderDictRaw(100, dictContent),
2319+
WithDecoderDictRaw(200, dictContent),
2320+
WithDecoderDictRaw(300, dictContent))
2321+
if err != nil {
2322+
t.Fatal(err)
2323+
}
2324+
defer dec.Close()
2325+
2326+
if len(dec.o.dicts) != 3 {
2327+
t.Fatalf("expected 3 dicts, got %d", len(dec.o.dicts))
2328+
}
2329+
2330+
// Delete multiple dicts in one call
2331+
err = dec.ResetWithOptions(nil, WithDecoderDictDelete(100, 300))
2332+
if err != nil {
2333+
t.Fatal(err)
2334+
}
2335+
if len(dec.o.dicts) != 1 {
2336+
t.Errorf("expected 1 dict after delete, got %d", len(dec.o.dicts))
2337+
}
2338+
if _, ok := dec.o.dicts[200]; !ok {
2339+
t.Error("dict 200 should still exist")
2340+
}
2341+
}

zstd/encoder.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,22 @@ func (e *Encoder) Reset(w io.Writer) {
131131
s.frameContentSize = 0
132132
}
133133

134+
// ResetWithOptions will re-initialize the writer and apply the given options
135+
// as a new, independent stream.
136+
// Options are applied on top of the existing options.
137+
// Some options cannot be changed on reset and will return an error.
138+
func (e *Encoder) ResetWithOptions(w io.Writer, opts ...EOption) error {
139+
e.o.resetOpt = true
140+
defer func() { e.o.resetOpt = false }()
141+
for _, o := range opts {
142+
if err := o(&e.o); err != nil {
143+
return err
144+
}
145+
}
146+
e.Reset(w)
147+
return nil
148+
}
149+
134150
// ResetContentSize will reset and set a content size for the next stream.
135151
// If the bytes written does not match the size given an error will be returned
136152
// when calling Close().

0 commit comments

Comments
 (0)