1313import logging
1414import re
1515import threading
16+ from collections import namedtuple
1617from io import BytesIO
1718
1819import awscrt .http
@@ -184,6 +185,16 @@ def _get_crt_throughput_target_gbps(provided_throughput_target_bytes=None):
184185 return target_gbps
185186
186187
188+ def _has_minimum_crt_version (minimum_version ):
189+ crt_version_str = awscrt .__version__
190+ try :
191+ crt_version_ints = map (int , crt_version_str .split ("." ))
192+ crt_version_tuple = tuple (crt_version_ints )
193+ except (TypeError , ValueError ):
194+ return False
195+ return crt_version_tuple >= minimum_version
196+
197+
187198class CRTTransferManager :
188199 ALLOWED_DOWNLOAD_ARGS = TransferManager .ALLOWED_DOWNLOAD_ARGS
189200 ALLOWED_UPLOAD_ARGS = TransferManager .ALLOWED_UPLOAD_ARGS
@@ -739,18 +750,25 @@ def set_s3_request(self, s3_request):
739750 self ._crt_future = self ._s3_request .finished_future
740751
741752
753+ CRTConfigParameter = namedtuple ('CRTConfigParameter' , ['name' , 'min_version' ])
754+
755+
742756class S3ClientArgsCreator :
757+ _CRT_ARG_TO_CONFIG_PARAM = {
758+ 'max_active_connections_override' : CRTConfigParameter (
759+ 'max_request_concurrency' , (0 , 29 , 0 )
760+ ),
761+ }
762+
743763 def __init__ (self , crt_request_serializer , os_utils , config = None ):
744764 self ._request_serializer = crt_request_serializer
745765 self ._os_utils = os_utils
746766 self ._config = config
747767
748768 def _get_crt_transfer_config_options (self , request_type ):
749- part_size = self ._config .multipart_chunksize
750- max_connections = self ._config .max_request_concurrency
751769 crt_config = {
752- 'part_size' : part_size ,
753- 'max_active_connections_override' : max_connections ,
770+ 'part_size' : self . _config . multipart_chunksize ,
771+ 'max_active_connections_override' : self . _config . max_request_concurrency ,
754772 }
755773
756774 if (
@@ -769,11 +787,35 @@ def _get_crt_transfer_config_options(self, request_type):
769787 crt_config .update (
770788 getattr (self , f'_get_crt_options_{ request_type } ' )()
771789 )
790+
791+ self ._remove_param_if_not_min_crt_version (crt_config )
772792 return crt_config
773793
774794 def _get_crt_options_put_object (self ):
775795 return {'multipart_upload_threshold' : self ._config .multipart_threshold }
776796
797+ def _remove_param_if_not_min_crt_version (self , crt_config ):
798+ to_remove = []
799+ for request_arg in crt_config :
800+ if request_arg not in self ._CRT_ARG_TO_CONFIG_PARAM :
801+ continue
802+ param = self ._CRT_ARG_TO_CONFIG_PARAM [request_arg ]
803+ if _has_minimum_crt_version (param .min_version ):
804+ continue
805+ if (
806+ self ._config .get_deep_attr (param .name )
807+ is not self ._config .UNSET_DEFAULT
808+ ):
809+ min_ver_str = '.' .join (str (i ) for i in param .min_version )
810+ logger .warning (
811+ f'Transfer config parameter { param .name } '
812+ f'requires minimum CRT version: { min_ver_str } . '
813+ f'{ param .name } will not be used in the request.'
814+ )
815+ to_remove .append (request_arg )
816+ for request_arg in to_remove :
817+ del crt_config [request_arg ]
818+
777819 def get_make_request_args (
778820 self , request_type , call_args , coordinator , future , on_done_after_calls
779821 ):
0 commit comments