Skip to content

Commit 8ebcc1e

Browse files
Merge branch 'release-0.16.0'
* release-0.16.0: Bumping version to 0.16.0 Add test for arg removal Validate min CRT version for config options Use request_type to build CRT args Edit changelog entry to feature type Add support for max_request_concurrency Add TransferConfig support for CRTTransferManager
2 parents e7d0e93 + cc4db16 commit 8ebcc1e

File tree

6 files changed

+213
-5
lines changed

6 files changed

+213
-5
lines changed

.changes/0.16.0.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[
2+
{
3+
"category": "``awscrt``",
4+
"description": "``CRTTransferManager`` now supports the following ``TransferConfig`` options - ``multipart_threshold``, ``multipart_chunksize``, ``max_request_concurrency``",
5+
"type": "feature"
6+
}
7+
]

CHANGELOG.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
CHANGELOG
33
=========
44

5+
0.16.0
6+
======
7+
8+
* feature:``awscrt``: ``CRTTransferManager`` now supports the following ``TransferConfig`` options - ``multipart_threshold``, ``multipart_chunksize``, ``max_request_concurrency``
9+
10+
511
0.15.0
612
======
713

s3transfer/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def __call__(self, bytes_amount):
146146
from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
147147

148148
__author__ = 'Amazon Web Services'
149-
__version__ = '0.15.0'
149+
__version__ = '0.16.0'
150150

151151

152152
logger = logging.getLogger(__name__)

s3transfer/crt.py

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import logging
1414
import re
1515
import threading
16+
from collections import namedtuple
1617
from io import BytesIO
1718

1819
import awscrt.http
@@ -184,6 +185,16 @@ def _get_crt_throughput_target_gbps(provided_throughput_target_bytes=None):
184185
return target_gbps
185186

186187

188+
def _has_minimum_crt_version(minimum_version):
189+
crt_version_str = awscrt.__version__
190+
try:
191+
crt_version_ints = map(int, crt_version_str.split("."))
192+
crt_version_tuple = tuple(crt_version_ints)
193+
except (TypeError, ValueError):
194+
return False
195+
return crt_version_tuple >= minimum_version
196+
197+
187198
class CRTTransferManager:
188199
ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
189200
ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
@@ -193,7 +204,9 @@ class CRTTransferManager:
193204

194205
_UNSUPPORTED_BUCKET_PATTERNS = TransferManager._UNSUPPORTED_BUCKET_PATTERNS
195206

196-
def __init__(self, crt_s3_client, crt_request_serializer, osutil=None):
207+
def __init__(
208+
self, crt_s3_client, crt_request_serializer, osutil=None, config=None
209+
):
197210
"""A transfer manager interface for Amazon S3 on CRT s3 client.
198211
199212
:type crt_s3_client: awscrt.s3.S3Client
@@ -207,12 +220,18 @@ def __init__(self, crt_s3_client, crt_request_serializer, osutil=None):
207220
:type osutil: s3transfer.utils.OSUtils
208221
:param osutil: OSUtils object to use for os-related behavior when
209222
using with transfer manager.
223+
224+
:type config: s3transfer.manager.TransferConfig
225+
:param config: The transfer configuration to be used when
226+
making CRT S3 client requests.
210227
"""
211228
if osutil is None:
212229
self._osutil = OSUtils()
213230
self._crt_s3_client = crt_s3_client
214231
self._s3_args_creator = S3ClientArgsCreator(
215-
crt_request_serializer, self._osutil
232+
crt_request_serializer,
233+
self._osutil,
234+
config,
216235
)
217236
self._crt_exception_translator = (
218237
crt_request_serializer.translate_crt_exception
@@ -731,10 +750,72 @@ def set_s3_request(self, s3_request):
731750
self._crt_future = self._s3_request.finished_future
732751

733752

753+
CRTConfigParameter = namedtuple('CRTConfigParameter', ['name', 'min_version'])
754+
755+
734756
class S3ClientArgsCreator:
735-
def __init__(self, crt_request_serializer, os_utils):
757+
_CRT_ARG_TO_CONFIG_PARAM = {
758+
'max_active_connections_override': CRTConfigParameter(
759+
'max_request_concurrency', (0, 29, 0)
760+
),
761+
}
762+
763+
def __init__(self, crt_request_serializer, os_utils, config=None):
736764
self._request_serializer = crt_request_serializer
737765
self._os_utils = os_utils
766+
self._config = config
767+
768+
def _get_crt_transfer_config_options(self, request_type):
769+
crt_config = {
770+
'part_size': self._config.multipart_chunksize,
771+
'max_active_connections_override': self._config.max_request_concurrency,
772+
}
773+
774+
if (
775+
self._config.get_deep_attr('multipart_chunksize')
776+
is self._config.UNSET_DEFAULT
777+
):
778+
# Let CRT dynamically calculate part size.
779+
crt_config['part_size'] = None
780+
if (
781+
self._config.get_deep_attr('max_request_concurrency')
782+
is self._config.UNSET_DEFAULT
783+
):
784+
crt_config['max_active_connections_override'] = None
785+
786+
if hasattr(self, f'_get_crt_options_{request_type}'):
787+
crt_config.update(
788+
getattr(self, f'_get_crt_options_{request_type}')()
789+
)
790+
self._remove_param_if_not_min_crt_version(crt_config)
791+
return crt_config
792+
793+
def _get_crt_options_put_object(self):
794+
return {'multipart_upload_threshold': self._config.multipart_threshold}
795+
796+
def _remove_param_if_not_min_crt_version(self, crt_config):
797+
to_remove = []
798+
for request_arg in crt_config:
799+
if request_arg not in self._CRT_ARG_TO_CONFIG_PARAM:
800+
continue
801+
param = self._CRT_ARG_TO_CONFIG_PARAM[request_arg]
802+
if _has_minimum_crt_version(param.min_version):
803+
continue
804+
# Only log the warning if user attempted to explicitly
805+
# use the transfer config parameter.
806+
if (
807+
self._config.get_deep_attr(param.name)
808+
is not self._config.UNSET_DEFAULT
809+
):
810+
min_ver_str = '.'.join(str(i) for i in param.min_version)
811+
logger.warning(
812+
f'Transfer config parameter {param.name} '
813+
f'requires minimum CRT version: {min_ver_str}. '
814+
f'{param.name} will not be used in the request.'
815+
)
816+
to_remove.append(request_arg)
817+
for request_arg in to_remove:
818+
del crt_config[request_arg]
738819

739820
def get_make_request_args(
740821
self, request_type, call_args, coordinator, future, on_done_after_calls
@@ -823,6 +904,10 @@ def _get_make_request_args_put_object(
823904
)
824905
make_request_args['send_filepath'] = send_filepath
825906
make_request_args['checksum_config'] = checksum_config
907+
if self._config is not None:
908+
make_request_args.update(
909+
self._get_crt_transfer_config_options(request_type)
910+
)
826911
return make_request_args
827912

828913
def _get_make_request_args_get_object(
@@ -859,6 +944,10 @@ def _get_make_request_args_get_object(
859944
make_request_args['recv_filepath'] = recv_filepath
860945
make_request_args['on_body'] = on_body
861946
make_request_args['checksum_config'] = checksum_config
947+
if self._config is not None:
948+
make_request_args.update(
949+
self._get_crt_transfer_config_options(request_type)
950+
)
862951
return make_request_args
863952

864953
def _default_get_make_request_args(

s3transfer/manager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151

5252
class TransferConfig:
53+
UNSET_DEFAULT = object()
54+
5355
def __init__(
5456
self,
5557
multipart_threshold=8 * MB,
@@ -152,12 +154,19 @@ def __init__(
152154

153155
def _validate_attrs_are_nonzero(self):
154156
for attr, attr_val in self.__dict__.items():
155-
if attr_val is not None and attr_val <= 0:
157+
if (
158+
attr_val is not None
159+
and attr_val is not self.UNSET_DEFAULT
160+
and attr_val <= 0
161+
):
156162
raise ValueError(
157163
f'Provided parameter {attr} of value {attr_val} must '
158164
'be greater than 0.'
159165
)
160166

167+
def get_deep_attr(self, item):
168+
return object.__getattribute__(self, item)
169+
161170

162171
class TransferManager:
163172
ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS

tests/functional/test_crt.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from botocore.session import Session
2020

21+
from s3transfer.constants import MB
22+
from s3transfer.manager import TransferConfig
2123
from s3transfer.subscribers import BaseSubscriber
2224
from tests import (
2325
HAS_CRT,
@@ -769,3 +771,98 @@ def test_crt_s3_client_error_handling(self):
769771
)
770772
with self.assertRaises(awscrt.exceptions.AwsCrtError):
771773
future.result()
774+
775+
def test_transfer_config_used_in_upload_request(self):
776+
config = TransferConfig(
777+
multipart_threshold=4 * MB,
778+
multipart_chunksize=2 * MB,
779+
max_request_concurrency=100,
780+
)
781+
transfer_manager = s3transfer.crt.CRTTransferManager(
782+
crt_s3_client=self.s3_crt_client,
783+
crt_request_serializer=self.request_serializer,
784+
config=config,
785+
)
786+
future = transfer_manager.upload(
787+
self.filename, self.bucket, self.key, {}, []
788+
)
789+
future.result()
790+
791+
callargs_kwargs = self.s3_crt_client.make_request.call_args[1]
792+
assert callargs_kwargs['multipart_upload_threshold'] == 4 * MB
793+
assert callargs_kwargs['part_size'] == 2 * MB
794+
assert callargs_kwargs['max_active_connections_override'] == 100
795+
796+
def test_transfer_config_used_in_download_request(self):
797+
config = TransferConfig(
798+
multipart_threshold=4 * MB,
799+
multipart_chunksize=2 * MB,
800+
max_request_concurrency=100,
801+
)
802+
transfer_manager = s3transfer.crt.CRTTransferManager(
803+
crt_s3_client=self.s3_crt_client,
804+
crt_request_serializer=self.request_serializer,
805+
config=config,
806+
)
807+
future = transfer_manager.download(
808+
self.bucket, self.key, self.filename, {}, []
809+
)
810+
future.result()
811+
812+
callargs_kwargs = self.s3_crt_client.make_request.call_args[1]
813+
assert callargs_kwargs['part_size'] == 2 * MB
814+
assert callargs_kwargs['max_active_connections_override'] == 100
815+
# Config option only used for PUT requests.
816+
assert 'multipart_upload_threshold' not in callargs_kwargs
817+
818+
def test_unset_part_size_defaults_to_none_in_upload_request(self):
819+
config = TransferConfig(
820+
multipart_chunksize=TransferConfig.UNSET_DEFAULT,
821+
)
822+
transfer_manager = s3transfer.crt.CRTTransferManager(
823+
crt_s3_client=self.s3_crt_client,
824+
crt_request_serializer=self.request_serializer,
825+
config=config,
826+
)
827+
future = transfer_manager.upload(
828+
self.filename, self.bucket, self.key, {}, []
829+
)
830+
future.result()
831+
832+
callargs_kwargs = self.s3_crt_client.make_request.call_args[1]
833+
assert callargs_kwargs['part_size'] is None
834+
835+
def test_unset_max_concurrency_defaults_to_none(self):
836+
config = TransferConfig(
837+
max_request_concurrency=TransferConfig.UNSET_DEFAULT,
838+
)
839+
transfer_manager = s3transfer.crt.CRTTransferManager(
840+
crt_s3_client=self.s3_crt_client,
841+
crt_request_serializer=self.request_serializer,
842+
config=config,
843+
)
844+
future = transfer_manager.upload(
845+
self.filename, self.bucket, self.key, {}, []
846+
)
847+
future.result()
848+
849+
callargs_kwargs = self.s3_crt_client.make_request.call_args[1]
850+
assert callargs_kwargs['max_active_connections_override'] is None
851+
852+
@mock.patch('awscrt.__version__', '0.28.0')
853+
def test_args_removed_if_not_min_awscrt_version(self):
854+
config = TransferConfig(
855+
max_request_concurrency=TransferConfig.UNSET_DEFAULT,
856+
)
857+
transfer_manager = s3transfer.crt.CRTTransferManager(
858+
crt_s3_client=self.s3_crt_client,
859+
crt_request_serializer=self.request_serializer,
860+
config=config,
861+
)
862+
future = transfer_manager.upload(
863+
self.filename, self.bucket, self.key, {}, []
864+
)
865+
future.result()
866+
867+
callargs_kwargs = self.s3_crt_client.make_request.call_args[1]
868+
assert 'max_active_connections_override' not in callargs_kwargs

0 commit comments

Comments
 (0)