From e4cecfe922b39f3f74a82f5ab3b675c936c4839e Mon Sep 17 00:00:00 2001 From: Thomas Gurung Date: Tue, 29 Apr 2025 23:43:38 +0100 Subject: [PATCH 1/8] Adding support to clone restricted assets --- cloudinary_cli/modules/clone.py | 131 ++++++++++++++++++++++++++++---- 1 file changed, 118 insertions(+), 13 deletions(-) diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index 2a45c1f..e4e3410 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -1,11 +1,14 @@ from click import command, option, style, argument from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit import cloudinary +from cloudinary.auth_token import _digest from cloudinary_cli.utils.utils import run_tasks_concurrently from cloudinary_cli.utils.api_utils import upload_file -from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict +from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict from cloudinary_cli.defaults import logger from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination +import time +import re DEFAULT_MAX_RESULTS = 500 @@ -32,32 +35,53 @@ @option("-w", "--concurrent_workers", type=int, default=30, help="Specify the number of concurrent network threads.") @option("-fi", "--fields", multiple=True, - help="Specify whether to copy tags and/or context. Valid options: `tags,context`.") + help=("Specify whether to copy tags and/or context. " + "Valid options: `tags,context`.")) @option("-se", "--search_exp", default="", help="Define a search expression to filter the assets to clone.") @option("--async", "async_", is_flag=True, default=False, help="Clone the assets asynchronously.") @option("-nu", "--notification_url", help="Webhook notification URL.") -def clone(target, force, overwrite, concurrent_workers, fields, search_exp, async_, notification_url): +@option("-t", "--ttl", type=int, default=3600, + help=("URL expiration duration in seconds. Only relevant if cloning " + "restricted assets without providing an auth_key in which case " + "a private download URL is generated which may incur additional " + "bandwidth costs.")) +def clone(target, force, overwrite, concurrent_workers, fields, + search_exp, async_, notification_url, ttl): if not target: print_help_and_exit() target_config = get_cloudinary_config(target) if not target_config: - logger.error("The specified config does not exist or the CLOUDINARY_URL scheme provided is invalid" - " (expecting to start with 'cloudinary://').") + logger.error("The specified config does not exist or the " + "CLOUDINARY_URL scheme provided is invalid " + "(expecting to start with 'cloudinary://').") return False if cloudinary.config().cloud_name == target_config.cloud_name: - logger.error("Target environment cannot be the same as source environment.") + logger.error("Target environment cannot be the same " + "as source environment.") + return False + + source_config = config_to_dict(cloudinary.config()) + auth_token = source_config.get("auth_token") + if auth_token and not validate_authtoken(auth_token): + # It is important to validate auth_token if provided as this prevents + # customer from having to re-run the command as well as + # saving Admin API calls and time. return False source_assets = search_assets(force, search_exp) + if not source_assets: + return False upload_list = [] for r in source_assets.get('resources'): - updated_options, asset_url = process_metadata(r, overwrite, async_, notification_url, + updated_options, asset_url = process_metadata(r, overwrite, async_, + notification_url, + auth_token, ttl, normalize_list_params(fields)) updated_options.update(config_to_dict(target_config)) upload_list.append((asset_url, {**updated_options})) @@ -73,9 +97,57 @@ def clone(target, force, overwrite, concurrent_workers, fields, search_exp, asyn return True +def validate_authtoken(auth_token): + duration = auth_token.get('duration') + if not duration: + logger.error("Duration is required when using auth_token. Include " + "`auth_token[duration]=` in your config.") + return False + try: + duration = int(duration) + if duration < 0: + logger.error("Duration cannot be negative.") + return False + except (ValueError, TypeError): + logger.error("Duration must be an integer.") + return False + key = auth_token.get('key') + if not key: + logger.error("No auth_token key found. Include " + "`auth_token[key]=` in your config.") + return False + try: + _digest("", key) + except Exception as e: + logger.error(f"Auth token invalid: {e}.") + return False + return True + + def search_assets(force, search_exp): + # Prevent other unsupported types to prevent + # avoidable errors during the upload process + # and append the default types in not in the + # search expression + ALLOWED_TYPES = {"type:upload", "type:private", "type:authenticated", + "type=upload", "type=private", "type=authenticated"} + if search_exp and re.search(r"\btype\s*[:=]\s*\w+", search_exp): + exp_types = re.findall(r"\btype\s*[:=]\s*\w+", search_exp) + exp_types_cleaned = [''.join(t.split()) for t in exp_types] + unallowed_types = [t for t in exp_types_cleaned if t not in ALLOWED_TYPES] + if unallowed_types: + logger.error("Unsupported type(s) in search expression: " + f"{', '.join(unallowed_types)}. " + "Only upload/private/authenticated types allowed.") + return False + elif search_exp: + search_exp += " AND (type:upload OR type:private OR type:authenticated)" + else: + search_exp = "type:upload OR type:private OR type:authenticated" + print(search_exp) search = cloudinary.search.Search().expression(search_exp) - search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name']) + search.fields(['tags', 'context', 'access_control', + 'secure_url', 'display_name', 'format']) search.max_results(DEFAULT_MAX_RESULTS) res = execute_single_request(search, fields_to_keep="") @@ -84,12 +156,45 @@ def search_assets(force, search_exp): return res -def process_metadata(res, overwrite, async_, notification_url, copy_fields=""): +def process_metadata(res, overwrite, async_, notification_url, + auth_token, ttl, copy_fields=""): cloned_options = {} - asset_url = res.get('secure_url') - cloned_options['public_id'] = res.get('public_id') - cloned_options['type'] = res.get('type') - cloned_options['resource_type'] = res.get('resource_type') + acc_ctl = res.get('access_control') + pub_id = res.get('public_id') + del_type = res.get('type') + reso_type = res.get('resource_type') + file_format = res.get('format') + if ( + isinstance(acc_ctl, list) + and len(acc_ctl) > 0 + and isinstance(acc_ctl[0], dict) + and acc_ctl[0].get("access_type") == "token" + ): + # Generate a time-limited URL for restricted assets + # Use private url if no auth_token provided + if auth_token: + pub_id_format = (pub_id if reso_type == "raw" + else f"{pub_id}.{file_format}") + asset_url = cloudinary.utils.cloudinary_url( + pub_id_format, + type=del_type, + resource_type=reso_type, + secure=True, + sign_url=True) + else: + expiry_date = int(time.time()) + ttl + asset_url = cloudinary.utils.private_download_url( + pub_id, + file_format, + resource_type=reso_type, + type=del_type, + expires_at=expiry_date) + else: + asset_url = res.get('secure_url') + cloned_options['access_control'] = acc_ctl + cloned_options['public_id'] = pub_id + cloned_options['type'] = del_type + cloned_options['resource_type'] = reso_type cloned_options['overwrite'] = overwrite cloned_options['async'] = async_ if "tags" in copy_fields: From 39f6063cf18095231aeac3db634bcf1ac2c5f703 Mon Sep 17 00:00:00 2001 From: Thomas Gurung Date: Wed, 30 Apr 2025 10:49:14 +0100 Subject: [PATCH 2/8] Updating help note --- cloudinary_cli/modules/clone.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index e4e3410..e504cd3 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -19,7 +19,6 @@ \b Clone assets from one product environment to another with/without tags and/or context (structured metadata is not currently supported). Source will be your `CLOUDINARY_URL` environment variable but you also can specify a different source using the `-c/-C` option. -Cloning restricted assets is also not supported currently. Format: cld clone `` can be a CLOUDINARY_URL or a saved config (see `config` command) Example 1 (Copy all assets including tags and context using CLOUDINARY URL): From 0a284847f4f310dbacfec0d7debc3b863a07091d Mon Sep 17 00:00:00 2001 From: Thomas Gurung Date: Tue, 3 Jun 2025 16:43:51 +0100 Subject: [PATCH 3/8] Updated auth token validation to use py sdk, TTL will be used by both private and token urls, minor refactoring --- cloudinary_cli/modules/clone.py | 54 ++++++++++++--------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index e504cd3..406c676 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -44,7 +44,7 @@ help="Webhook notification URL.") @option("-t", "--ttl", type=int, default=3600, help=("URL expiration duration in seconds. Only relevant if cloning " - "restricted assets without providing an auth_key in which case " + "restricted assets. If you do not provide an auth_key, " "a private download URL is generated which may incur additional " "bandwidth costs.")) def clone(target, force, overwrite, concurrent_workers, fields, @@ -64,16 +64,21 @@ def clone(target, force, overwrite, concurrent_workers, fields, "as source environment.") return False - source_config = config_to_dict(cloudinary.config()) - auth_token = source_config.get("auth_token") - if auth_token and not validate_authtoken(auth_token): + auth_token = cloudinary.config().auth_token + if auth_token: # It is important to validate auth_token if provided as this prevents # customer from having to re-run the command as well as # saving Admin API calls and time. - return False + try: + cloudinary.utils.generate_auth_token(acl="/image/*") + except Exception as e: + logger.error(f"{e} - auth_token validation failed. " + "Please double-check your auth_token parameters.") + return False source_assets = search_assets(force, search_exp) if not source_assets: + # End command if search_exp contains unsupported type(s) return False upload_list = [] @@ -85,44 +90,21 @@ def clone(target, force, overwrite, concurrent_workers, fields, updated_options.update(config_to_dict(target_config)) upload_list.append((asset_url, {**updated_options})) + source_cloud_name = cloudinary.config().cloud_name if not upload_list: - logger.error(style(f'No assets found in {cloudinary.config().cloud_name}', fg="red")) + logger.error(style('No asset(s) found in ' + f'{source_cloud_name}', fg="red")) return False - logger.info(style(f'Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}', fg="blue")) + logger.info(style(f'Copying {len(upload_list)} asset(s) from ' + f'{source_cloud_name} to ' + f'{target_config.cloud_name}', fg="blue")) run_tasks_concurrently(upload_file, upload_list, concurrent_workers) return True -def validate_authtoken(auth_token): - duration = auth_token.get('duration') - if not duration: - logger.error("Duration is required when using auth_token. Include " - "`auth_token[duration]=` in your config.") - return False - try: - duration = int(duration) - if duration < 0: - logger.error("Duration cannot be negative.") - return False - except (ValueError, TypeError): - logger.error("Duration must be an integer.") - return False - key = auth_token.get('key') - if not key: - logger.error("No auth_token key found. Include " - "`auth_token[key]=` in your config.") - return False - try: - _digest("", key) - except Exception as e: - logger.error(f"Auth token invalid: {e}.") - return False - return True - - def search_assets(force, search_exp): # Prevent other unsupported types to prevent # avoidable errors during the upload process @@ -143,7 +125,7 @@ def search_assets(force, search_exp): search_exp += " AND (type:upload OR type:private OR type:authenticated)" else: search_exp = "type:upload OR type:private OR type:authenticated" - print(search_exp) + search = cloudinary.search.Search().expression(search_exp) search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name', 'format']) @@ -172,12 +154,14 @@ def process_metadata(res, overwrite, async_, notification_url, # Generate a time-limited URL for restricted assets # Use private url if no auth_token provided if auth_token: + # Don't add format if asset is raw pub_id_format = (pub_id if reso_type == "raw" else f"{pub_id}.{file_format}") asset_url = cloudinary.utils.cloudinary_url( pub_id_format, type=del_type, resource_type=reso_type, + auth_token={"duration": ttl}, secure=True, sign_url=True) else: From 36d5c3d7df90d62c3791d29361759a91fd9f02d9 Mon Sep 17 00:00:00 2001 From: Constantine Nathanson Date: Wed, 2 Jul 2025 14:38:44 +0300 Subject: [PATCH 4/8] Add tests --- test/test_modules/test_cli_clone.py | 318 ++++++++++++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 test/test_modules/test_cli_clone.py diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py new file mode 100644 index 0000000..ec61946 --- /dev/null +++ b/test/test_modules/test_cli_clone.py @@ -0,0 +1,318 @@ +import unittest +from unittest.mock import patch, MagicMock +import re + +from cloudinary_cli.modules.clone import search_assets, process_metadata +from cloudinary_cli.defaults import logger + + +class TestCLIClone(unittest.TestCase): + + def setUp(self): + self.mock_search_result = { + 'resources': [ + { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'tags': ['tag1', 'tag2'], + 'context': {'key': 'value'}, + 'access_control': None, + 'folder': 'test_folder', + 'display_name': 'Test Asset' + } + ] + } + + @patch('cloudinary_cli.modules.clone.handle_auto_pagination') + @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_default_expression(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets with empty search expression uses default""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = search_assets(force=True, search_exp="") + + # Verify default search expression is used + mock_search.expression.assert_called_with("type:upload OR type:private OR type:authenticated") + self.assertEqual(result, self.mock_search_result) + + @patch('cloudinary_cli.modules.clone.handle_auto_pagination') + @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_with_custom_expression(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets appends default types to custom expression""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = search_assets(force=True, search_exp="tags:test") + + # Verify custom expression gets default types appended + expected_exp = "tags:test AND (type:upload OR type:private OR type:authenticated)" + mock_search.expression.assert_called_with(expected_exp) + self.assertEqual(result, self.mock_search_result) + + @patch('cloudinary_cli.modules.clone.handle_auto_pagination') + @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_with_allowed_type(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets accepts allowed types""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = search_assets(force=True, search_exp="type:upload") + + # Verify allowed type is accepted as-is + mock_search.expression.assert_called_with("type:upload") + self.assertEqual(result, self.mock_search_result) + + @patch('cloudinary_cli.modules.clone.logger') + def test_search_assets_with_disallowed_type(self, mock_logger): + """Test search_assets rejects disallowed types""" + result = search_assets(force=True, search_exp="type:facebook") + + # Verify error is logged and False is returned + mock_logger.error.assert_called_once() + error_call = mock_logger.error.call_args[0][0] + self.assertIn("Unsupported type(s) in search expression", error_call) + self.assertIn("facebook", error_call) + self.assertEqual(result, False) + + @patch('cloudinary_cli.modules.clone.logger') + def test_search_assets_with_mixed_types(self, mock_logger): + """Test search_assets with mix of allowed and disallowed types""" + result = search_assets(force=True, search_exp="type:upload OR type:facebook") + + # Verify error is logged for disallowed type + mock_logger.error.assert_called_once() + error_call = mock_logger.error.call_args[0][0] + self.assertIn("facebook", error_call) + # Verify that only the disallowed type is mentioned in the error part + self.assertIn("Unsupported type(s) in search expression: facebook", error_call) + self.assertEqual(result, False) + + def test_search_assets_type_validation_regex(self): + """Test the regex used for type validation""" + # Test various type formats + test_cases = [ + ("type:upload", ["upload"]), + ("type=upload", ["upload"]), + ("type: upload", ["upload"]), # with space + ("type = upload", ["upload"]), # with spaces + ("type:upload OR type:private", ["upload", "private"]), + ("tags:test AND type:authenticated", ["authenticated"]), + ] + + for search_exp, expected_types in test_cases: + with self.subTest(search_exp=search_exp): + found_types = re.findall(r"\btype\s*[:=]\s*\w+", search_exp) + cleaned_types = [''.join(t.split()) for t in found_types] + # Extract just the type names + type_names = [t.split(':')[-1].split('=')[-1] for t in cleaned_types] + self.assertEqual(sorted(type_names), sorted(expected_types)) + + def test_process_metadata_basic(self): + """Test process_metadata with basic asset""" + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': None + } + + options, url = process_metadata( + res, overwrite=True, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(url, 'https://res.cloudinary.com/test/image/upload/test_asset.jpg') + self.assertEqual(options['public_id'], 'test_asset') + self.assertEqual(options['type'], 'upload') + self.assertEqual(options['resource_type'], 'image') + self.assertEqual(options['overwrite'], True) + self.assertEqual(options['async'], False) + + def test_process_metadata_with_tags_and_context(self): + """Test process_metadata copying tags and context""" + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': None, + 'tags': ['tag1', 'tag2'], + 'context': {'key': 'value'} + } + + options, url = process_metadata( + res, overwrite=False, async_=True, notification_url='http://webhook.com', + auth_token=None, ttl=3600, copy_fields=['tags', 'context'] + ) + + self.assertEqual(options['tags'], ['tag1', 'tag2']) + self.assertEqual(options['context'], {'key': 'value'}) + self.assertEqual(options['notification_url'], 'http://webhook.com') + self.assertEqual(options['overwrite'], False) + self.assertEqual(options['async'], True) + + def test_process_metadata_with_folder(self): + """Test process_metadata with folder handling""" + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': None, + 'folder': 'test_folder' + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['asset_folder'], 'test_folder') + + def test_process_metadata_with_asset_folder(self): + """Test process_metadata with asset_folder""" + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': None, + 'asset_folder': 'asset_folder_test' + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['asset_folder'], 'asset_folder_test') + + def test_process_metadata_with_display_name(self): + """Test process_metadata with display_name""" + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': None, + 'display_name': 'Test Asset Display Name' + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['display_name'], 'Test Asset Display Name') + + @patch('cloudinary_cli.modules.clone.time.time') + @patch('cloudinary.utils.private_download_url') + def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, mock_time): + """Test process_metadata with restricted asset and no auth token""" + mock_time.return_value = 1000 + mock_private_url.return_value = 'https://private.url/test_asset.jpg' + + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': [{'access_type': 'token'}] + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + # Should use private download URL + mock_private_url.assert_called_once_with( + 'test_asset', 'jpg', resource_type='image', type='upload', expires_at=4600 + ) + self.assertEqual(url, 'https://private.url/test_asset.jpg') + + @patch('cloudinary.utils.cloudinary_url') + def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary_url): + """Test process_metadata with restricted asset and auth token""" + mock_cloudinary_url.return_value = ('https://signed.url/test_asset.jpg', {}) + + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'access_control': [{'access_type': 'token'}] + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token={'key': 'value'}, ttl=3600, copy_fields=[] + ) + + # Should use signed URL + mock_cloudinary_url.assert_called_once_with( + 'test_asset.jpg', + type='upload', + resource_type='image', + auth_token={'duration': 3600}, + secure=True, + sign_url=True + ) + # The current implementation assigns the tuple directly, so we expect the tuple + self.assertEqual(url, ('https://signed.url/test_asset.jpg', {})) + + @patch('cloudinary.utils.cloudinary_url') + def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudinary_url): + """Test process_metadata with restricted raw asset and auth token""" + mock_cloudinary_url.return_value = ('https://signed.url/test_asset', {}) + + res = { + 'public_id': 'test_asset', + 'type': 'upload', + 'resource_type': 'raw', + 'format': 'pdf', + 'secure_url': 'https://res.cloudinary.com/test/raw/upload/test_asset.pdf', + 'access_control': [{'access_type': 'token'}] + } + + options, url = process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token={'key': 'value'}, ttl=3600, copy_fields=[] + ) + + # For raw assets, should not append format to public_id + mock_cloudinary_url.assert_called_once_with( + 'test_asset', # No .pdf extension for raw assets + type='upload', + resource_type='raw', + auth_token={'duration': 3600}, + secure=True, + sign_url=True + ) + # The current implementation assigns the tuple directly, so we expect the tuple + self.assertEqual(url, ('https://signed.url/test_asset', {})) + + +if __name__ == '__main__': + unittest.main() From 36fca3fa54917f89c1fda91d74e4ee29c8442c6c Mon Sep 17 00:00:00 2001 From: Constantine Nathanson Date: Wed, 2 Jul 2025 15:08:59 +0300 Subject: [PATCH 5/8] fix test values --- test/test_modules/test_cli_clone.py | 66 +++++++++++++---------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py index ec61946..8e9b87c 100644 --- a/test/test_modules/test_cli_clone.py +++ b/test/test_modules/test_cli_clone.py @@ -12,14 +12,13 @@ def setUp(self): self.mock_search_result = { 'resources': [ { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'tags': ['tag1', 'tag2'], 'context': {'key': 'value'}, - 'access_control': None, 'folder': 'test_folder', 'display_name': 'Test Asset' } @@ -97,7 +96,7 @@ def test_search_assets_with_mixed_types(self, mock_logger): error_call = mock_logger.error.call_args[0][0] self.assertIn("facebook", error_call) # Verify that only the disallowed type is mentioned in the error part - self.assertIn("Unsupported type(s) in search expression: facebook", error_call) + self.assertIn("Unsupported type(s) in search expression: type:facebook", error_call) self.assertEqual(result, False) def test_search_assets_type_validation_regex(self): @@ -123,12 +122,11 @@ def test_search_assets_type_validation_regex(self): def test_process_metadata_basic(self): """Test process_metadata with basic asset""" res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', - 'access_control': None + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg' } options, url = process_metadata( @@ -136,8 +134,8 @@ def test_process_metadata_basic(self): auth_token=None, ttl=3600, copy_fields=[] ) - self.assertEqual(url, 'https://res.cloudinary.com/test/image/upload/test_asset.jpg') - self.assertEqual(options['public_id'], 'test_asset') + self.assertEqual(url, 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg') + self.assertEqual(options['public_id'], 'sample') self.assertEqual(options['type'], 'upload') self.assertEqual(options['resource_type'], 'image') self.assertEqual(options['overwrite'], True) @@ -146,12 +144,11 @@ def test_process_metadata_basic(self): def test_process_metadata_with_tags_and_context(self): """Test process_metadata copying tags and context""" res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', - 'access_control': None, + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'tags': ['tag1', 'tag2'], 'context': {'key': 'value'} } @@ -170,12 +167,11 @@ def test_process_metadata_with_tags_and_context(self): def test_process_metadata_with_folder(self): """Test process_metadata with folder handling""" res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', - 'access_control': None, + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'folder': 'test_folder' } @@ -189,12 +185,11 @@ def test_process_metadata_with_folder(self): def test_process_metadata_with_asset_folder(self): """Test process_metadata with asset_folder""" res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', - 'access_control': None, + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'asset_folder': 'asset_folder_test' } @@ -208,12 +203,11 @@ def test_process_metadata_with_asset_folder(self): def test_process_metadata_with_display_name(self): """Test process_metadata with display_name""" res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', - 'access_control': None, + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'display_name': 'Test Asset Display Name' } @@ -229,14 +223,14 @@ def test_process_metadata_with_display_name(self): def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, mock_time): """Test process_metadata with restricted asset and no auth token""" mock_time.return_value = 1000 - mock_private_url.return_value = 'https://private.url/test_asset.jpg' + mock_private_url.return_value = 'https://api.cloudinary.com/v1_1/demo/image/download?api_key=123456789012345&format=jpg&public_id=sample&signature=abcdef123456789×tamp=1234567890' res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'access_control': [{'access_type': 'token'}] } @@ -247,21 +241,21 @@ def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, # Should use private download URL mock_private_url.assert_called_once_with( - 'test_asset', 'jpg', resource_type='image', type='upload', expires_at=4600 + 'sample', 'jpg', resource_type='image', type='upload', expires_at=4600 ) - self.assertEqual(url, 'https://private.url/test_asset.jpg') + self.assertEqual(url, 'https://api.cloudinary.com/v1_1/demo/image/download?api_key=123456789012345&format=jpg&public_id=sample&signature=abcdef123456789×tamp=1234567890') @patch('cloudinary.utils.cloudinary_url') def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary_url): """Test process_metadata with restricted asset and auth token""" - mock_cloudinary_url.return_value = ('https://signed.url/test_asset.jpg', {}) + mock_cloudinary_url.return_value = ('https://res.cloudinary.com/demo/image/upload/s--AbCdEfGhI--/sample.jpg', {}) res = { - 'public_id': 'test_asset', + 'public_id': 'sample', 'type': 'upload', 'resource_type': 'image', 'format': 'jpg', - 'secure_url': 'https://res.cloudinary.com/test/image/upload/test_asset.jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', 'access_control': [{'access_type': 'token'}] } @@ -272,7 +266,7 @@ def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary # Should use signed URL mock_cloudinary_url.assert_called_once_with( - 'test_asset.jpg', + 'sample.jpg', type='upload', resource_type='image', auth_token={'duration': 3600}, @@ -280,19 +274,19 @@ def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary sign_url=True ) # The current implementation assigns the tuple directly, so we expect the tuple - self.assertEqual(url, ('https://signed.url/test_asset.jpg', {})) + self.assertEqual(url, ('https://res.cloudinary.com/demo/image/upload/s--AbCdEfGhI--/sample.jpg', {})) @patch('cloudinary.utils.cloudinary_url') def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudinary_url): """Test process_metadata with restricted raw asset and auth token""" - mock_cloudinary_url.return_value = ('https://signed.url/test_asset', {}) + mock_cloudinary_url.return_value = ('https://res.cloudinary.com/demo/raw/upload/s--XyZaBcDeF--/sample_document', {}) res = { - 'public_id': 'test_asset', + 'public_id': 'sample_document', 'type': 'upload', 'resource_type': 'raw', 'format': 'pdf', - 'secure_url': 'https://res.cloudinary.com/test/raw/upload/test_asset.pdf', + 'secure_url': 'https://res.cloudinary.com/demo/raw/upload/v1234567890/sample_document.pdf', 'access_control': [{'access_type': 'token'}] } @@ -303,7 +297,7 @@ def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudi # For raw assets, should not append format to public_id mock_cloudinary_url.assert_called_once_with( - 'test_asset', # No .pdf extension for raw assets + 'sample_document', # No .pdf extension for raw assets type='upload', resource_type='raw', auth_token={'duration': 3600}, @@ -311,7 +305,7 @@ def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudi sign_url=True ) # The current implementation assigns the tuple directly, so we expect the tuple - self.assertEqual(url, ('https://signed.url/test_asset', {})) + self.assertEqual(url, ('https://res.cloudinary.com/demo/raw/upload/s--XyZaBcDeF--/sample_document', {})) if __name__ == '__main__': From 87466e79db359f4fc9e555c1213d0ac48240e46f Mon Sep 17 00:00:00 2001 From: Constantine Nathanson Date: Wed, 2 Jul 2025 15:56:47 +0300 Subject: [PATCH 6/8] Fix tests --- test/test_modules/test_cli_clone.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py index 8e9b87c..e7adfeb 100644 --- a/test/test_modules/test_cli_clone.py +++ b/test/test_modules/test_cli_clone.py @@ -218,7 +218,7 @@ def test_process_metadata_with_display_name(self): self.assertEqual(options['display_name'], 'Test Asset Display Name') - @patch('cloudinary_cli.modules.clone.time.time') + @patch('time.time') @patch('cloudinary.utils.private_download_url') def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, mock_time): """Test process_metadata with restricted asset and no auth token""" From d1a0eca6bbb4b916cb64c1f609e985a479fb920d Mon Sep 17 00:00:00 2001 From: Constantine Nathanson Date: Wed, 2 Jul 2025 16:11:04 +0300 Subject: [PATCH 7/8] Fix tests --- test/test_modules/test_cli_clone.py | 51 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py index e7adfeb..492fda6 100644 --- a/test/test_modules/test_cli_clone.py +++ b/test/test_modules/test_cli_clone.py @@ -1,8 +1,15 @@ import unittest from unittest.mock import patch, MagicMock import re +import importlib.util +import os + +# Get the path to the clone module and load it directly to avoid conflicts with the command object +clone_module_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'cloudinary_cli', 'modules', 'clone.py')) +spec = importlib.util.spec_from_file_location("clone_module", clone_module_path) +clone_module = importlib.util.module_from_spec(spec) +spec.loader.exec_module(clone_module) -from cloudinary_cli.modules.clone import search_assets, process_metadata from cloudinary_cli.defaults import logger @@ -25,8 +32,8 @@ def setUp(self): ] } - @patch('cloudinary_cli.modules.clone.handle_auto_pagination') - @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') @patch('cloudinary.search.Search') def test_search_assets_default_expression(self, mock_search_class, mock_execute, mock_pagination): """Test search_assets with empty search expression uses default""" @@ -35,14 +42,14 @@ def test_search_assets_default_expression(self, mock_search_class, mock_execute, mock_execute.return_value = self.mock_search_result mock_pagination.return_value = self.mock_search_result - result = search_assets(force=True, search_exp="") + result = clone_module.search_assets(force=True, search_exp="") # Verify default search expression is used mock_search.expression.assert_called_with("type:upload OR type:private OR type:authenticated") self.assertEqual(result, self.mock_search_result) - @patch('cloudinary_cli.modules.clone.handle_auto_pagination') - @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') @patch('cloudinary.search.Search') def test_search_assets_with_custom_expression(self, mock_search_class, mock_execute, mock_pagination): """Test search_assets appends default types to custom expression""" @@ -51,15 +58,15 @@ def test_search_assets_with_custom_expression(self, mock_search_class, mock_exec mock_execute.return_value = self.mock_search_result mock_pagination.return_value = self.mock_search_result - result = search_assets(force=True, search_exp="tags:test") + result = clone_module.search_assets(force=True, search_exp="tags:test") # Verify custom expression gets default types appended expected_exp = "tags:test AND (type:upload OR type:private OR type:authenticated)" mock_search.expression.assert_called_with(expected_exp) self.assertEqual(result, self.mock_search_result) - @patch('cloudinary_cli.modules.clone.handle_auto_pagination') - @patch('cloudinary_cli.modules.clone.execute_single_request') + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') @patch('cloudinary.search.Search') def test_search_assets_with_allowed_type(self, mock_search_class, mock_execute, mock_pagination): """Test search_assets accepts allowed types""" @@ -68,16 +75,16 @@ def test_search_assets_with_allowed_type(self, mock_search_class, mock_execute, mock_execute.return_value = self.mock_search_result mock_pagination.return_value = self.mock_search_result - result = search_assets(force=True, search_exp="type:upload") + result = clone_module.search_assets(force=True, search_exp="type:upload") # Verify allowed type is accepted as-is mock_search.expression.assert_called_with("type:upload") self.assertEqual(result, self.mock_search_result) - @patch('cloudinary_cli.modules.clone.logger') + @patch.object(clone_module, 'logger') def test_search_assets_with_disallowed_type(self, mock_logger): """Test search_assets rejects disallowed types""" - result = search_assets(force=True, search_exp="type:facebook") + result = clone_module.search_assets(force=True, search_exp="type:facebook") # Verify error is logged and False is returned mock_logger.error.assert_called_once() @@ -86,10 +93,10 @@ def test_search_assets_with_disallowed_type(self, mock_logger): self.assertIn("facebook", error_call) self.assertEqual(result, False) - @patch('cloudinary_cli.modules.clone.logger') + @patch.object(clone_module, 'logger') def test_search_assets_with_mixed_types(self, mock_logger): """Test search_assets with mix of allowed and disallowed types""" - result = search_assets(force=True, search_exp="type:upload OR type:facebook") + result = clone_module.search_assets(force=True, search_exp="type:upload OR type:facebook") # Verify error is logged for disallowed type mock_logger.error.assert_called_once() @@ -129,7 +136,7 @@ def test_process_metadata_basic(self): 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg' } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=True, async_=False, notification_url=None, auth_token=None, ttl=3600, copy_fields=[] ) @@ -153,7 +160,7 @@ def test_process_metadata_with_tags_and_context(self): 'context': {'key': 'value'} } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=True, notification_url='http://webhook.com', auth_token=None, ttl=3600, copy_fields=['tags', 'context'] ) @@ -175,7 +182,7 @@ def test_process_metadata_with_folder(self): 'folder': 'test_folder' } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token=None, ttl=3600, copy_fields=[] ) @@ -193,7 +200,7 @@ def test_process_metadata_with_asset_folder(self): 'asset_folder': 'asset_folder_test' } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token=None, ttl=3600, copy_fields=[] ) @@ -211,7 +218,7 @@ def test_process_metadata_with_display_name(self): 'display_name': 'Test Asset Display Name' } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token=None, ttl=3600, copy_fields=[] ) @@ -234,7 +241,7 @@ def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, 'access_control': [{'access_type': 'token'}] } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token=None, ttl=3600, copy_fields=[] ) @@ -259,7 +266,7 @@ def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary 'access_control': [{'access_type': 'token'}] } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token={'key': 'value'}, ttl=3600, copy_fields=[] ) @@ -290,7 +297,7 @@ def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudi 'access_control': [{'access_type': 'token'}] } - options, url = process_metadata( + options, url = clone_module.process_metadata( res, overwrite=False, async_=False, notification_url=None, auth_token={'key': 'value'}, ttl=3600, copy_fields=[] ) From b67a9621ec2f7e6d9aa6ddfc75fb20a79d938069 Mon Sep 17 00:00:00 2001 From: Constantine Nathanson Date: Wed, 2 Jul 2025 23:20:24 +0300 Subject: [PATCH 8/8] small refactoring :) --- cloudinary_cli/modules/clone.py | 232 ++++++++++++++++------------ test/test_modules/test_cli_clone.py | 14 +- 2 files changed, 143 insertions(+), 103 deletions(-) diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index 406c676..bc48a18 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -11,6 +11,7 @@ import re DEFAULT_MAX_RESULTS = 500 +ALLOWED_TYPE_VALUES = ("upload", "private", "authenticated") @command("clone", @@ -49,6 +50,30 @@ "bandwidth costs.")) def clone(target, force, overwrite, concurrent_workers, fields, search_exp, async_, notification_url, ttl): + target_config, auth_token = _validate_clone_inputs(target) + if not target_config: + return False + + source_assets = search_assets(search_exp, force) + if not source_assets or not source_assets.get('resources'): + logger.error(style(f"No asset(s) found in {cloudinary.config().cloud_name}", fg="red")) + return False + + upload_list = _prepare_upload_list( + source_assets, target_config, overwrite, async_, + notification_url, auth_token, ttl, fields + ) + + logger.info(style(f"Copying {len(upload_list)} asset(s) from " + f"{cloudinary.config().cloud_name} to " + f"{target_config.cloud_name}", fg="blue")) + + run_tasks_concurrently(upload_file, upload_list, concurrent_workers) + + return True + + +def _validate_clone_inputs(target): if not target: print_help_and_exit() @@ -57,12 +82,12 @@ def clone(target, force, overwrite, concurrent_workers, fields, logger.error("The specified config does not exist or the " "CLOUDINARY_URL scheme provided is invalid " "(expecting to start with 'cloudinary://').") - return False + return None, None if cloudinary.config().cloud_name == target_config.cloud_name: logger.error("Target environment cannot be the same " "as source environment.") - return False + return None, None auth_token = cloudinary.config().auth_token if auth_token: @@ -74,13 +99,13 @@ def clone(target, force, overwrite, concurrent_workers, fields, except Exception as e: logger.error(f"{e} - auth_token validation failed. " "Please double-check your auth_token parameters.") - return False + return None, None + + return target_config, auth_token - source_assets = search_assets(force, search_exp) - if not source_assets: - # End command if search_exp contains unsupported type(s) - return False +def _prepare_upload_list(source_assets, target_config, overwrite, async_, + notification_url, auth_token, ttl, fields): upload_list = [] for r in source_assets.get('resources'): updated_options, asset_url = process_metadata(r, overwrite, async_, @@ -89,42 +114,13 @@ def clone(target, force, overwrite, concurrent_workers, fields, normalize_list_params(fields)) updated_options.update(config_to_dict(target_config)) upload_list.append((asset_url, {**updated_options})) + return upload_list - source_cloud_name = cloudinary.config().cloud_name - if not upload_list: - logger.error(style('No asset(s) found in ' - f'{source_cloud_name}', fg="red")) - return False - - logger.info(style(f'Copying {len(upload_list)} asset(s) from ' - f'{source_cloud_name} to ' - f'{target_config.cloud_name}', fg="blue")) - - run_tasks_concurrently(upload_file, upload_list, concurrent_workers) - - return True - -def search_assets(force, search_exp): - # Prevent other unsupported types to prevent - # avoidable errors during the upload process - # and append the default types in not in the - # search expression - ALLOWED_TYPES = {"type:upload", "type:private", "type:authenticated", - "type=upload", "type=private", "type=authenticated"} - if search_exp and re.search(r"\btype\s*[:=]\s*\w+", search_exp): - exp_types = re.findall(r"\btype\s*[:=]\s*\w+", search_exp) - exp_types_cleaned = [''.join(t.split()) for t in exp_types] - unallowed_types = [t for t in exp_types_cleaned if t not in ALLOWED_TYPES] - if unallowed_types: - logger.error("Unsupported type(s) in search expression: " - f"{', '.join(unallowed_types)}. " - "Only upload/private/authenticated types allowed.") - return False - elif search_exp: - search_exp += " AND (type:upload OR type:private OR type:authenticated)" - else: - search_exp = "type:upload OR type:private OR type:authenticated" +def search_assets(search_exp, force): + search_exp = _normalize_search_expression(search_exp) + if not search_exp: + return False search = cloudinary.search.Search().expression(search_exp) search.fields(['tags', 'context', 'access_control', @@ -137,64 +133,110 @@ def search_assets(force, search_exp): return res -def process_metadata(res, overwrite, async_, notification_url, - auth_token, ttl, copy_fields=""): - cloned_options = {} - acc_ctl = res.get('access_control') - pub_id = res.get('public_id') - del_type = res.get('type') +def _normalize_search_expression(search_exp): + """ + Ensures the search expression has a valid 'type' filter. + + - If no expression is given, a default is created. + - If 'type' filters exist, they are validated. + - If no 'type' filters exist, the default is appended. + """ + default_types_str = " OR ".join(f"type:{t}" for t in ALLOWED_TYPE_VALUES) + + if not search_exp: + return default_types_str + + # Use a simple regex to find all 'type' filters + found_types = re.findall(r"\btype\s*[:=]\s*(\w+)", search_exp) + + if not found_types: + # No 'type' filter found, so append the default + return f"{search_exp} AND ({default_types_str})" + + # A 'type' filter was found, so validate it + invalid_types = {t for t in found_types if t not in ALLOWED_TYPE_VALUES} + + if invalid_types: + error_msg = ", ".join(f"type:{t}" for t in invalid_types) + logger.error( + f"Unsupported type(s) in search expression: {error_msg}. " + f"Only {', '.join(ALLOWED_TYPE_VALUES)} types allowed." + ) + return None + + # All found types are valid, so return the original expression + return search_exp + + +def process_metadata(res, overwrite, async_, notification_url, auth_token, ttl, copy_fields=None): + if copy_fields is None: + copy_fields = [] + asset_url = _get_asset_url(res, auth_token, ttl) + cloned_options = _build_cloned_options(res, overwrite, async_, notification_url, copy_fields) + + return cloned_options, asset_url + + +def _get_asset_url(res, auth_token, ttl): + if not (isinstance(res.get('access_control'), list) and + len(res.get('access_control')) > 0 and + isinstance(res['access_control'][0], dict) and + res['access_control'][0].get("access_type") == "token"): + return res.get('secure_url') + reso_type = res.get('resource_type') + del_type = res.get('type') + pub_id = res.get('public_id') file_format = res.get('format') - if ( - isinstance(acc_ctl, list) - and len(acc_ctl) > 0 - and isinstance(acc_ctl[0], dict) - and acc_ctl[0].get("access_type") == "token" - ): - # Generate a time-limited URL for restricted assets - # Use private url if no auth_token provided - if auth_token: - # Don't add format if asset is raw - pub_id_format = (pub_id if reso_type == "raw" - else f"{pub_id}.{file_format}") - asset_url = cloudinary.utils.cloudinary_url( - pub_id_format, - type=del_type, - resource_type=reso_type, - auth_token={"duration": ttl}, - secure=True, - sign_url=True) - else: - expiry_date = int(time.time()) + ttl - asset_url = cloudinary.utils.private_download_url( - pub_id, - file_format, - resource_type=reso_type, - type=del_type, - expires_at=expiry_date) - else: - asset_url = res.get('secure_url') - cloned_options['access_control'] = acc_ctl - cloned_options['public_id'] = pub_id - cloned_options['type'] = del_type - cloned_options['resource_type'] = reso_type - cloned_options['overwrite'] = overwrite - cloned_options['async'] = async_ - if "tags" in copy_fields: - cloned_options['tags'] = res.get('tags') - if "context" in copy_fields: - cloned_options['context'] = res.get('context') + + if auth_token: + # Raw assets already have the format in the public_id + pub_id_format = pub_id if reso_type == "raw" else f"{pub_id}.{file_format}" + return cloudinary.utils.cloudinary_url( + pub_id_format, + type=del_type, + resource_type=reso_type, + auth_token={"duration": ttl}, + secure=True, + sign_url=True + ) + + # Use private url if no auth_token provided + return cloudinary.utils.private_download_url( + pub_id, + file_format, + resource_type=reso_type, + type=del_type, + expires_at=int(time.time()) + ttl + ) + + +def _build_cloned_options(res, overwrite, async_, notification_url, copy_fields): + # 1. Start with mandatory options + cloned_options = { + 'overwrite': overwrite, + 'async': async_, + } + + # 2. Copy fields from source asset. Some are standard, others are from user input. + fields_to_copy = {'public_id', 'type', 'resource_type', 'access_control'}.union(copy_fields) + cloned_options.update({field: res.get(field) for field in fields_to_copy}) + + # 3. Handle fields that are added only if they have a truthy value + if res.get('display_name'): + cloned_options['display_name'] = res['display_name'] + + # This is required to put the asset in the correct asset_folder + # when copying from a fixed to DF (dynamic folder) cloud as if + # you just pass a `folder` param to a DF cloud, it will append + # this to the `public_id` and we don't want this. if res.get('folder'): - # This is required to put the asset in the correct asset_folder - # when copying from a fixed to DF (dynamic folder) cloud as if - # you just pass a `folder` param to a DF cloud, it will append - # this to the `public_id` and we don't want this. - cloned_options['asset_folder'] = res.get('folder') + cloned_options['asset_folder'] = res['folder'] elif res.get('asset_folder'): - cloned_options['asset_folder'] = res.get('asset_folder') - if res.get('display_name'): - cloned_options['display_name'] = res.get('display_name') + cloned_options['asset_folder'] = res['asset_folder'] + if notification_url: cloned_options['notification_url'] = notification_url - return cloned_options, asset_url + # 4. Clean up any None values before returning + return {k: v for k, v in cloned_options.items() if v is not None} diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py index 492fda6..edc77d7 100644 --- a/test/test_modules/test_cli_clone.py +++ b/test/test_modules/test_cli_clone.py @@ -1,14 +1,12 @@ import unittest from unittest.mock import patch, MagicMock import re -import importlib.util -import os - -# Get the path to the clone module and load it directly to avoid conflicts with the command object -clone_module_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'cloudinary_cli', 'modules', 'clone.py')) -spec = importlib.util.spec_from_file_location("clone_module", clone_module_path) -clone_module = importlib.util.module_from_spec(spec) -spec.loader.exec_module(clone_module) +import sys + +# Import the modules package, which will load the clone module. +# The 'clone' name in the package is the command object, so we get the module from sys.modules. +import cloudinary_cli.modules +clone_module = sys.modules['cloudinary_cli.modules.clone'] from cloudinary_cli.defaults import logger