Skip to content

Commit 31b8ccf

Browse files
authored
SNOW-2294562: Change GET to do multi-part parallel download for S3, Azure, and GCP (#1549)
1 parent 113f716 commit 31b8ccf

13 files changed

+514
-96
lines changed

azure_storage_client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ func (util *snowflakeAzureClient) uploadFile(
265265
func (util *snowflakeAzureClient) nativeDownloadFile(
266266
meta *fileMetadata,
267267
fullDstFileName string,
268-
maxConcurrency int64) error {
268+
maxConcurrency int64,
269+
partSize int64) error {
269270
azureLoc, err := util.extractContainerNameAndPath(meta.stageInfo.Location)
270271
if err != nil {
271272
return err
@@ -319,7 +320,9 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
319320
_, err = withCloudStorageTimeout(util.cfg, func(ctx context.Context) (any, error) {
320321
return blobClient.DownloadFile(
321322
ctx, f, &azblob.DownloadFileOptions{
322-
Concurrency: uint16(maxConcurrency)})
323+
Concurrency: uint16(maxConcurrency),
324+
BlockSize: int64Max(partSize, blob.DefaultDownloadBlockSize),
325+
})
323326
})
324327
if err != nil {
325328
return err

azure_storage_client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func TestUploadFileWithAzureUploadFailedError(t *testing.T) {
169169
overwrite: true,
170170
dstCompressionType: compressionTypes["GZIP"],
171171
options: &SnowflakeFileTransferOptions{
172-
MultiPartThreshold: dataSizeThreshold,
172+
MultiPartThreshold: multiPartThreshold,
173173
},
174174
mockAzureClient: &azureObjectAPIMock{
175175
UploadFileFunc: func(ctx context.Context, file *os.File, o *azblob.UploadFileOptions) (azblob.UploadFileResponse, error) {
@@ -228,7 +228,7 @@ func TestUploadStreamWithAzureUploadFailedError(t *testing.T) {
228228
overwrite: true,
229229
dstCompressionType: compressionTypes["GZIP"],
230230
options: &SnowflakeFileTransferOptions{
231-
MultiPartThreshold: dataSizeThreshold,
231+
MultiPartThreshold: multiPartThreshold,
232232
},
233233
mockAzureClient: &azureObjectAPIMock{
234234
UploadStreamFunc: func(ctx context.Context, body io.Reader, o *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error) {
@@ -291,7 +291,7 @@ func TestUploadFileWithAzureUploadTokenExpired(t *testing.T) {
291291
overwrite: true,
292292
dstCompressionType: compressionTypes["GZIP"],
293293
options: &SnowflakeFileTransferOptions{
294-
MultiPartThreshold: dataSizeThreshold,
294+
MultiPartThreshold: multiPartThreshold,
295295
},
296296
mockAzureClient: &azureObjectAPIMock{
297297
UploadFileFunc: func(ctx context.Context, file *os.File, o *azblob.UploadFileOptions) (azblob.UploadFileResponse, error) {
@@ -368,7 +368,7 @@ func TestUploadFileWithAzureUploadNeedsRetry(t *testing.T) {
368368
overwrite: true,
369369
dstCompressionType: compressionTypes["GZIP"],
370370
options: &SnowflakeFileTransferOptions{
371-
MultiPartThreshold: dataSizeThreshold,
371+
MultiPartThreshold: multiPartThreshold,
372372
},
373373
mockAzureClient: &azureObjectAPIMock{
374374
UploadFileFunc: func(ctx context.Context, file *os.File, o *azblob.UploadFileOptions) (azblob.UploadFileResponse, error) {
@@ -430,7 +430,7 @@ func TestDownloadOneFileToAzureFailed(t *testing.T) {
430430
srcFileName: "data1.txt.gz",
431431
localLocation: dir,
432432
options: &SnowflakeFileTransferOptions{
433-
MultiPartThreshold: dataSizeThreshold,
433+
MultiPartThreshold: multiPartThreshold,
434434
},
435435
mockAzureClient: &azureObjectAPIMock{
436436
DownloadFileFunc: func(ctx context.Context, file *os.File, o *blob.DownloadFileOptions) (int64, error) {
@@ -576,7 +576,7 @@ func TestUploadFileToAzureClientCastFail(t *testing.T) {
576576
encryptMeta: testEncryptionMeta(),
577577
overwrite: true,
578578
options: &SnowflakeFileTransferOptions{
579-
MultiPartThreshold: dataSizeThreshold,
579+
MultiPartThreshold: multiPartThreshold,
580580
},
581581
sfa: &snowflakeFileTransferAgent{
582582
sc: &snowflakeConn{

connection_configuration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package gosnowflake
22

33
import (
44
"bytes"
5-
"database/sql"
65
"crypto/rand"
76
"crypto/rsa"
7+
"database/sql"
88
"fmt"
99
"io/fs"
1010
"os"

connection_util.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ func (sc *snowflakeConn) processFileTransfer(
114114
sfa.options = op
115115
}
116116
if sfa.options.MultiPartThreshold == 0 {
117-
sfa.options.MultiPartThreshold = dataSizeThreshold
117+
sfa.options.MultiPartThreshold = multiPartThreshold
118+
// for streaming download, use a smaller default part size
119+
if sfa.commandType == downloadCommand && sfa.options.GetFileToStream {
120+
sfa.options.MultiPartThreshold = streamingMultiPartThreshold
121+
}
118122
}
119123
if err := sfa.execute(); err != nil {
120124
return nil, err

file_transfer_agent.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ type (
3232
)
3333

3434
const (
35-
fileProtocol = "file://"
36-
dataSizeThreshold int64 = 64 * 1024 * 1024
37-
isWindows = runtime.GOOS == "windows"
38-
mb float64 = 1024.0 * 1024.0
35+
fileProtocol = "file://"
36+
multiPartThreshold int64 = 64 * 1024 * 1024
37+
streamingMultiPartThreshold int64 = 8 * 1024 * 1024
38+
isWindows = runtime.GOOS == "windows"
39+
mb float64 = 1024.0 * 1024.0
3940
)
4041

4142
const (
@@ -178,12 +179,19 @@ func (sfa *snowflakeFileTransferAgent) execute() error {
178179
if sfa.stageLocationType != local {
179180
sizeThreshold := sfa.options.MultiPartThreshold
180181
meta.options.MultiPartThreshold = sizeThreshold
181-
if meta.srcFileSize > sizeThreshold && sfa.commandType == uploadCommand {
182+
if sfa.commandType == uploadCommand {
183+
if meta.srcFileSize > sizeThreshold {
184+
meta.parallel = sfa.parallel
185+
largeFileMetas = append(largeFileMetas, meta)
186+
} else {
187+
meta.parallel = 1
188+
smallFileMetas = append(smallFileMetas, meta)
189+
}
190+
} else {
191+
// Enable multi-part download for all files to improve performance.
192+
// The MultiPartThreshold will be passed to the Cloud Storage Provider to determine the part size.
182193
meta.parallel = sfa.parallel
183194
largeFileMetas = append(largeFileMetas, meta)
184-
} else {
185-
meta.parallel = 1
186-
smallFileMetas = append(smallFileMetas, meta)
187195
}
188196
} else {
189197
meta.parallel = 1
@@ -196,7 +204,7 @@ func (sfa *snowflakeFileTransferAgent) execute() error {
196204
return err
197205
}
198206
} else {
199-
if err = sfa.download(smallFileMetas); err != nil {
207+
if err = sfa.download(largeFileMetas); err != nil {
200208
return err
201209
}
202210
}

file_transfer_agent_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ func TestUpdateMetadataWithPresignedUrl(t *testing.T) {
487487
srcFileName: path.Join(dir, "/test_data/data1.txt"),
488488
overwrite: true,
489489
options: &SnowflakeFileTransferOptions{
490-
MultiPartThreshold: dataSizeThreshold,
490+
MultiPartThreshold: multiPartThreshold,
491491
},
492492
}
493493

@@ -621,7 +621,7 @@ func TestUploadWhenFilesystemReadOnlyError(t *testing.T) {
621621
srcFileName: path.Join(dir, "/test_data/data1.txt"),
622622
overwrite: true,
623623
options: &SnowflakeFileTransferOptions{
624-
MultiPartThreshold: dataSizeThreshold,
624+
MultiPartThreshold: multiPartThreshold,
625625
},
626626
}
627627

@@ -798,7 +798,7 @@ func TestCustomTmpDirPath(t *testing.T) {
798798
srcFileName: uploadFile,
799799
overwrite: true,
800800
options: &SnowflakeFileTransferOptions{
801-
MultiPartThreshold: dataSizeThreshold,
801+
MultiPartThreshold: multiPartThreshold,
802802
},
803803
}
804804

@@ -817,7 +817,7 @@ func TestCustomTmpDirPath(t *testing.T) {
817817
dstFileName: downloadFile,
818818
overwrite: true,
819819
options: &SnowflakeFileTransferOptions{
820-
MultiPartThreshold: dataSizeThreshold,
820+
MultiPartThreshold: multiPartThreshold,
821821
},
822822
}
823823

@@ -884,7 +884,7 @@ func TestReadonlyTmpDirPathShouldFail(t *testing.T) {
884884
srcFileName: uploadFile,
885885
overwrite: true,
886886
options: &SnowflakeFileTransferOptions{
887-
MultiPartThreshold: dataSizeThreshold,
887+
MultiPartThreshold: multiPartThreshold,
888888
},
889889
}
890890

@@ -940,7 +940,7 @@ func testUploadDownloadOneFile(t *testing.T, isStream bool) {
940940
srcFileName: uploadFile,
941941
overwrite: true,
942942
options: &SnowflakeFileTransferOptions{
943-
MultiPartThreshold: dataSizeThreshold,
943+
MultiPartThreshold: multiPartThreshold,
944944
},
945945
requireCompress: true,
946946
}
@@ -959,8 +959,9 @@ func testUploadDownloadOneFile(t *testing.T, isStream bool) {
959959
srcFileName: "data.txt.gz",
960960
dstFileName: downloadFile,
961961
overwrite: true,
962+
parallel: int64(10),
962963
options: &SnowflakeFileTransferOptions{
963-
MultiPartThreshold: dataSizeThreshold,
964+
MultiPartThreshold: multiPartThreshold,
964965
},
965966
}
966967

0 commit comments

Comments
 (0)