1313import logging
1414import re
1515import threading
16+ from collections import namedtuple
1617from io import BytesIO
1718
1819import awscrt .http
@@ -184,6 +185,16 @@ def _get_crt_throughput_target_gbps(provided_throughput_target_bytes=None):
184185 return target_gbps
185186
186187
188+ def _has_minimum_crt_version (minimum_version ):
189+ crt_version_str = awscrt .__version__
190+ try :
191+ crt_version_ints = map (int , crt_version_str .split ("." ))
192+ crt_version_tuple = tuple (crt_version_ints )
193+ except (TypeError , ValueError ):
194+ return False
195+ return crt_version_tuple >= minimum_version
196+
197+
187198class CRTTransferManager :
188199 ALLOWED_DOWNLOAD_ARGS = TransferManager .ALLOWED_DOWNLOAD_ARGS
189200 ALLOWED_UPLOAD_ARGS = TransferManager .ALLOWED_UPLOAD_ARGS
@@ -193,7 +204,9 @@ class CRTTransferManager:
193204
194205 _UNSUPPORTED_BUCKET_PATTERNS = TransferManager ._UNSUPPORTED_BUCKET_PATTERNS
195206
196- def __init__ (self , crt_s3_client , crt_request_serializer , osutil = None ):
207+ def __init__ (
208+ self , crt_s3_client , crt_request_serializer , osutil = None , config = None
209+ ):
197210 """A transfer manager interface for Amazon S3 on CRT s3 client.
198211
199212 :type crt_s3_client: awscrt.s3.S3Client
@@ -207,12 +220,18 @@ def __init__(self, crt_s3_client, crt_request_serializer, osutil=None):
207220 :type osutil: s3transfer.utils.OSUtils
208221 :param osutil: OSUtils object to use for os-related behavior when
209222 using with transfer manager.
223+
224+ :type config: s3transfer.manager.TransferConfig
225+ :param config: The transfer configuration to be used when
226+ making CRT S3 client requests.
210227 """
211228 if osutil is None :
212229 self ._osutil = OSUtils ()
213230 self ._crt_s3_client = crt_s3_client
214231 self ._s3_args_creator = S3ClientArgsCreator (
215- crt_request_serializer , self ._osutil
232+ crt_request_serializer ,
233+ self ._osutil ,
234+ config ,
216235 )
217236 self ._crt_exception_translator = (
218237 crt_request_serializer .translate_crt_exception
@@ -731,10 +750,72 @@ def set_s3_request(self, s3_request):
731750 self ._crt_future = self ._s3_request .finished_future
732751
733752
753+ CRTConfigParameter = namedtuple ('CRTConfigParameter' , ['name' , 'min_version' ])
754+
755+
734756class S3ClientArgsCreator :
735- def __init__ (self , crt_request_serializer , os_utils ):
757+ _CRT_ARG_TO_CONFIG_PARAM = {
758+ 'max_active_connections_override' : CRTConfigParameter (
759+ 'max_request_concurrency' , (0 , 29 , 0 )
760+ ),
761+ }
762+
763+ def __init__ (self , crt_request_serializer , os_utils , config = None ):
736764 self ._request_serializer = crt_request_serializer
737765 self ._os_utils = os_utils
766+ self ._config = config
767+
768+ def _get_crt_transfer_config_options (self , request_type ):
769+ crt_config = {
770+ 'part_size' : self ._config .multipart_chunksize ,
771+ 'max_active_connections_override' : self ._config .max_request_concurrency ,
772+ }
773+
774+ if (
775+ self ._config .get_deep_attr ('multipart_chunksize' )
776+ is self ._config .UNSET_DEFAULT
777+ ):
778+ # Let CRT dynamically calculate part size.
779+ crt_config ['part_size' ] = None
780+ if (
781+ self ._config .get_deep_attr ('max_request_concurrency' )
782+ is self ._config .UNSET_DEFAULT
783+ ):
784+ crt_config ['max_active_connections_override' ] = None
785+
786+ if hasattr (self , f'_get_crt_options_{ request_type } ' ):
787+ crt_config .update (
788+ getattr (self , f'_get_crt_options_{ request_type } ' )()
789+ )
790+ self ._remove_param_if_not_min_crt_version (crt_config )
791+ return crt_config
792+
793+ def _get_crt_options_put_object (self ):
794+ return {'multipart_upload_threshold' : self ._config .multipart_threshold }
795+
796+ def _remove_param_if_not_min_crt_version (self , crt_config ):
797+ to_remove = []
798+ for request_arg in crt_config :
799+ if request_arg not in self ._CRT_ARG_TO_CONFIG_PARAM :
800+ continue
801+ param = self ._CRT_ARG_TO_CONFIG_PARAM [request_arg ]
802+ if _has_minimum_crt_version (param .min_version ):
803+ continue
804+ # Only log the warning if user attempted to explicitly
805+ # use the transfer config parameter.
806+ if (
807+ self ._config .get_deep_attr (param .name )
808+ is not self ._config .UNSET_DEFAULT
809+ ):
810+ min_ver_str = '.' .join (str (i ) for i in param .min_version )
811+ logger .warning (
812+ f'Transfer config parameter { param .name } '
813+ f'requires minimum CRT version: { min_ver_str } . '
814+ f'{ param .name } will not be used in the request.'
815+ )
816+ to_remove .append (request_arg )
817+ for request_arg in to_remove :
818+ del crt_config [request_arg ]
738819
739820 def get_make_request_args (
740821 self , request_type , call_args , coordinator , future , on_done_after_calls
@@ -823,6 +904,10 @@ def _get_make_request_args_put_object(
823904 )
824905 make_request_args ['send_filepath' ] = send_filepath
825906 make_request_args ['checksum_config' ] = checksum_config
907+ if self ._config is not None :
908+ make_request_args .update (
909+ self ._get_crt_transfer_config_options (request_type )
910+ )
826911 return make_request_args
827912
828913 def _get_make_request_args_get_object (
@@ -859,6 +944,10 @@ def _get_make_request_args_get_object(
859944 make_request_args ['recv_filepath' ] = recv_filepath
860945 make_request_args ['on_body' ] = on_body
861946 make_request_args ['checksum_config' ] = checksum_config
947+ if self ._config is not None :
948+ make_request_args .update (
949+ self ._get_crt_transfer_config_options (request_type )
950+ )
862951 return make_request_args
863952
864953 def _default_get_make_request_args (
0 commit comments