Skip to content

Commit d49edcb

Browse files
authored
Transfer Manager v1 uploader optimization (#3284)
1 parent e3ef881 commit d49edcb

3 files changed

Lines changed: 181 additions & 6 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "458251ad-35f0-459c-b2d4-5d165a1d9420",
3+
"type": "feature",
4+
"description": "Optimize allocation for transfer manager v1 uploader so no extra memory is used in buffer pool for single upload.",
5+
"modules": [
6+
"feature/s3/manager"
7+
]
8+
}

feature/s3/manager/upload.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,19 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
501501
return reader, int(n), cleanup, err
502502

503503
default:
504+
if u.readerPos == 0 {
505+
r := io.LimitReader(u.in.Body, u.cfg.PartSize)
506+
firstPart, err := io.ReadAll(r)
507+
if err != nil {
508+
return nil, 0, func() {}, err
509+
}
510+
n := len(firstPart)
511+
u.readerPos += int64(n)
512+
if int64(n) < u.cfg.PartSize {
513+
return bytes.NewReader(firstPart), n, func() {}, io.EOF
514+
}
515+
return bytes.NewReader(firstPart), n, func() {}, nil
516+
}
504517
part, err := u.cfg.partPool.Get(u.ctx)
505518
if err != nil {
506519
return nil, 0, func() {}, err

feature/s3/manager/upload_test.go

Lines changed: 160 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,160 @@ func TestUploadOrderMulti(t *testing.T) {
113113
}
114114
}
115115

116+
func TestUploadOrderMultiTriggerredBySinglePartSize(t *testing.T) {
117+
c, invocations, args := s3testing.NewUploadLoggingClient(nil)
118+
u := manager.NewUploader(c)
119+
120+
resp, err := u.Upload(context.Background(), &s3.PutObjectInput{
121+
Bucket: aws.String("Bucket"),
122+
Key: aws.String("Key - value"),
123+
Body: bytes.NewBuffer(make([]byte, 5*1024*1024)),
124+
ServerSideEncryption: "aws:kms",
125+
SSEKMSKeyId: aws.String("KmsId"),
126+
ContentType: aws.String("content/type"),
127+
})
128+
129+
if err != nil {
130+
t.Errorf("Expected no error but received %v", err)
131+
}
132+
133+
if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart",
134+
"CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
135+
t.Error(err)
136+
}
137+
138+
if e, a := `https://mock.amazonaws.com/key`, resp.Location; e != a {
139+
t.Errorf("expect %q, got %q", e, a)
140+
}
141+
142+
if "UPLOAD-ID" != resp.UploadID {
143+
t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID)
144+
}
145+
146+
if "VERSION-ID" != *resp.VersionID {
147+
t.Errorf("expect %q, got %q", "VERSION-ID", *resp.VersionID)
148+
}
149+
150+
// Validate input values
151+
v := aws.ToString((*args)[1].(*s3.UploadPartInput).UploadId)
152+
if "UPLOAD-ID" != v {
153+
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
154+
}
155+
v = aws.ToString((*args)[2].(*s3.CompleteMultipartUploadInput).UploadId)
156+
if "UPLOAD-ID" != v {
157+
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
158+
}
159+
160+
parts := (*args)[2].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts
161+
162+
num := parts[0].PartNumber
163+
etag := aws.ToString(parts[0].ETag)
164+
165+
if aws.ToInt32(num) != 1 {
166+
t.Errorf("expect 1, got %d", num)
167+
}
168+
169+
if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil {
170+
t.Errorf("Failed regexp expression `^ETAG\\d+$` , got %s", etag)
171+
}
172+
173+
// Custom headers
174+
cmu := (*args)[0].(*s3.CreateMultipartUploadInput)
175+
176+
if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a {
177+
t.Errorf("expect %q, got %q", e, a)
178+
}
179+
180+
if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a {
181+
t.Errorf("expect %q, got %q", e, a)
182+
}
183+
184+
if e, a := "content/type", aws.ToString(cmu.ContentType); e != a {
185+
t.Errorf("expect %q, got %q", e, a)
186+
}
187+
}
188+
189+
func TestUploadOrderMultiJustExceedSinglePart(t *testing.T) {
190+
c, invocations, args := s3testing.NewUploadLoggingClient(nil)
191+
u := manager.NewUploader(c)
192+
193+
resp, err := u.Upload(context.Background(), &s3.PutObjectInput{
194+
Bucket: aws.String("Bucket"),
195+
Key: aws.String("Key - value"),
196+
Body: bytes.NewBuffer(make([]byte, 5*1024*1024+1)),
197+
ServerSideEncryption: "aws:kms",
198+
SSEKMSKeyId: aws.String("KmsId"),
199+
ContentType: aws.String("content/type"),
200+
})
201+
202+
if err != nil {
203+
t.Errorf("Expected no error but received %v", err)
204+
}
205+
206+
if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart",
207+
"CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
208+
t.Error(err)
209+
}
210+
211+
if e, a := `https://mock.amazonaws.com/key`, resp.Location; e != a {
212+
t.Errorf("expect %q, got %q", e, a)
213+
}
214+
215+
if "UPLOAD-ID" != resp.UploadID {
216+
t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID)
217+
}
218+
219+
if "VERSION-ID" != *resp.VersionID {
220+
t.Errorf("expect %q, got %q", "VERSION-ID", *resp.VersionID)
221+
}
222+
223+
// Validate input values
224+
225+
// UploadPart
226+
for i := 1; i < 3; i++ {
227+
v := aws.ToString((*args)[i].(*s3.UploadPartInput).UploadId)
228+
if "UPLOAD-ID" != v {
229+
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
230+
}
231+
}
232+
233+
// CompleteMultipartUpload
234+
v := aws.ToString((*args)[3].(*s3.CompleteMultipartUploadInput).UploadId)
235+
if "UPLOAD-ID" != v {
236+
t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
237+
}
238+
239+
parts := (*args)[3].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts
240+
241+
for i := 0; i < 2; i++ {
242+
num := parts[i].PartNumber
243+
etag := aws.ToString(parts[i].ETag)
244+
245+
if int32(i+1) != aws.ToInt32(num) {
246+
t.Errorf("expect %d, got %d", i+1, num)
247+
}
248+
249+
if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil {
250+
t.Errorf("Failed regexp expression `^ETAG\\d+$`")
251+
}
252+
}
253+
254+
// Custom headers
255+
cmu := (*args)[0].(*s3.CreateMultipartUploadInput)
256+
257+
if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a {
258+
t.Errorf("expect %q, got %q", e, a)
259+
}
260+
261+
if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a {
262+
t.Errorf("expect %q, got %q", e, a)
263+
}
264+
265+
if e, a := "content/type", aws.ToString(cmu.ContentType); e != a {
266+
t.Errorf("expect %q, got %q", e, a)
267+
}
268+
}
269+
116270
func TestUploadOrderMultiDifferentPartSize(t *testing.T) {
117271
s, ops, args := s3testing.NewUploadLoggingClient(nil)
118272
mgr := manager.NewUploader(s, func(u *manager.Uploader) {
@@ -411,13 +565,13 @@ func TestUploadOrderMultiFailureLeaveParts(t *testing.T) {
411565
}
412566

413567
type failreader struct {
414-
times int
415-
failCount int
568+
failBytes int64
569+
readBytes int64
416570
}
417571

418572
func (f *failreader) Read(b []byte) (int, error) {
419-
f.failCount++
420-
if f.failCount >= f.times {
573+
f.readBytes += int64(len(b))
574+
if f.readBytes > f.failBytes {
421575
return 0, fmt.Errorf("random failure")
422576
}
423577
return len(b), nil
@@ -429,7 +583,7 @@ func TestUploadOrderReadFail1(t *testing.T) {
429583
_, err := mgr.Upload(context.Background(), &s3.PutObjectInput{
430584
Bucket: aws.String("Bucket"),
431585
Key: aws.String("Key"),
432-
Body: &failreader{times: 1},
586+
Body: &failreader{failBytes: 1},
433587
})
434588
if err == nil {
435589
t.Fatalf("expect error to not be nil")
@@ -452,7 +606,7 @@ func TestUploadOrderReadFail2(t *testing.T) {
452606
_, err := mgr.Upload(context.Background(), &s3.PutObjectInput{
453607
Bucket: aws.String("Bucket"),
454608
Key: aws.String("Key"),
455-
Body: &failreader{times: 2},
609+
Body: &failreader{failBytes: 5 * 1024 * 1024},
456610
})
457611
if err == nil {
458612
t.Fatalf("expect error to not be nil")

0 commit comments

Comments
 (0)