diff --git a/cloudinary_cli/modules/clone.py b/cloudinary_cli/modules/clone.py index 2a45c1f..bc48a18 100644 --- a/cloudinary_cli/modules/clone.py +++ b/cloudinary_cli/modules/clone.py @@ -1,13 +1,17 @@ from click import command, option, style, argument from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit import cloudinary +from cloudinary.auth_token import _digest from cloudinary_cli.utils.utils import run_tasks_concurrently from cloudinary_cli.utils.api_utils import upload_file -from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict +from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict from cloudinary_cli.defaults import logger from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination +import time +import re DEFAULT_MAX_RESULTS = 500 +ALLOWED_TYPE_VALUES = ("upload", "private", "authenticated") @command("clone", @@ -16,7 +20,6 @@ \b Clone assets from one product environment to another with/without tags and/or context (structured metadata is not currently supported). Source will be your `CLOUDINARY_URL` environment variable but you also can specify a different source using the `-c/-C` option. -Cloning restricted assets is also not supported currently. Format: cld clone `` can be a CLOUDINARY_URL or a saved config (see `config` command) Example 1 (Copy all assets including tags and context using CLOUDINARY URL): @@ -32,50 +35,96 @@ @option("-w", "--concurrent_workers", type=int, default=30, help="Specify the number of concurrent network threads.") @option("-fi", "--fields", multiple=True, - help="Specify whether to copy tags and/or context. Valid options: `tags,context`.") + help=("Specify whether to copy tags and/or context. " + "Valid options: `tags,context`.")) @option("-se", "--search_exp", default="", help="Define a search expression to filter the assets to clone.") @option("--async", "async_", is_flag=True, default=False, help="Clone the assets asynchronously.") @option("-nu", "--notification_url", help="Webhook notification URL.") -def clone(target, force, overwrite, concurrent_workers, fields, search_exp, async_, notification_url): +@option("-t", "--ttl", type=int, default=3600, + help=("URL expiration duration in seconds. Only relevant if cloning " + "restricted assets. If you do not provide an auth_key, " + "a private download URL is generated which may incur additional " + "bandwidth costs.")) +def clone(target, force, overwrite, concurrent_workers, fields, + search_exp, async_, notification_url, ttl): + target_config, auth_token = _validate_clone_inputs(target) + if not target_config: + return False + + source_assets = search_assets(search_exp, force) + if not source_assets or not source_assets.get('resources'): + logger.error(style(f"No asset(s) found in {cloudinary.config().cloud_name}", fg="red")) + return False + + upload_list = _prepare_upload_list( + source_assets, target_config, overwrite, async_, + notification_url, auth_token, ttl, fields + ) + + logger.info(style(f"Copying {len(upload_list)} asset(s) from " + f"{cloudinary.config().cloud_name} to " + f"{target_config.cloud_name}", fg="blue")) + + run_tasks_concurrently(upload_file, upload_list, concurrent_workers) + + return True + + +def _validate_clone_inputs(target): if not target: print_help_and_exit() target_config = get_cloudinary_config(target) if not target_config: - logger.error("The specified config does not exist or the CLOUDINARY_URL scheme provided is invalid" - " (expecting to start with 'cloudinary://').") - return False + logger.error("The specified config does not exist or the " + "CLOUDINARY_URL scheme provided is invalid " + "(expecting to start with 'cloudinary://').") + return None, None if cloudinary.config().cloud_name == target_config.cloud_name: - logger.error("Target environment cannot be the same as source environment.") - return False - - source_assets = search_assets(force, search_exp) - + logger.error("Target environment cannot be the same " + "as source environment.") + return None, None + + auth_token = cloudinary.config().auth_token + if auth_token: + # It is important to validate auth_token if provided as this prevents + # customer from having to re-run the command as well as + # saving Admin API calls and time. + try: + cloudinary.utils.generate_auth_token(acl="/image/*") + except Exception as e: + logger.error(f"{e} - auth_token validation failed. " + "Please double-check your auth_token parameters.") + return None, None + + return target_config, auth_token + + +def _prepare_upload_list(source_assets, target_config, overwrite, async_, + notification_url, auth_token, ttl, fields): upload_list = [] for r in source_assets.get('resources'): - updated_options, asset_url = process_metadata(r, overwrite, async_, notification_url, + updated_options, asset_url = process_metadata(r, overwrite, async_, + notification_url, + auth_token, ttl, normalize_list_params(fields)) updated_options.update(config_to_dict(target_config)) upload_list.append((asset_url, {**updated_options})) + return upload_list - if not upload_list: - logger.error(style(f'No assets found in {cloudinary.config().cloud_name}', fg="red")) - return False - - logger.info(style(f'Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}', fg="blue")) - - run_tasks_concurrently(upload_file, upload_list, concurrent_workers) - - return True +def search_assets(search_exp, force): + search_exp = _normalize_search_expression(search_exp) + if not search_exp: + return False -def search_assets(force, search_exp): search = cloudinary.search.Search().expression(search_exp) - search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name']) + search.fields(['tags', 'context', 'access_control', + 'secure_url', 'display_name', 'format']) search.max_results(DEFAULT_MAX_RESULTS) res = execute_single_request(search, fields_to_keep="") @@ -84,29 +133,110 @@ def search_assets(force, search_exp): return res -def process_metadata(res, overwrite, async_, notification_url, copy_fields=""): - cloned_options = {} - asset_url = res.get('secure_url') - cloned_options['public_id'] = res.get('public_id') - cloned_options['type'] = res.get('type') - cloned_options['resource_type'] = res.get('resource_type') - cloned_options['overwrite'] = overwrite - cloned_options['async'] = async_ - if "tags" in copy_fields: - cloned_options['tags'] = res.get('tags') - if "context" in copy_fields: - cloned_options['context'] = res.get('context') +def _normalize_search_expression(search_exp): + """ + Ensures the search expression has a valid 'type' filter. + + - If no expression is given, a default is created. + - If 'type' filters exist, they are validated. + - If no 'type' filters exist, the default is appended. + """ + default_types_str = " OR ".join(f"type:{t}" for t in ALLOWED_TYPE_VALUES) + + if not search_exp: + return default_types_str + + # Use a simple regex to find all 'type' filters + found_types = re.findall(r"\btype\s*[:=]\s*(\w+)", search_exp) + + if not found_types: + # No 'type' filter found, so append the default + return f"{search_exp} AND ({default_types_str})" + + # A 'type' filter was found, so validate it + invalid_types = {t for t in found_types if t not in ALLOWED_TYPE_VALUES} + + if invalid_types: + error_msg = ", ".join(f"type:{t}" for t in invalid_types) + logger.error( + f"Unsupported type(s) in search expression: {error_msg}. " + f"Only {', '.join(ALLOWED_TYPE_VALUES)} types allowed." + ) + return None + + # All found types are valid, so return the original expression + return search_exp + + +def process_metadata(res, overwrite, async_, notification_url, auth_token, ttl, copy_fields=None): + if copy_fields is None: + copy_fields = [] + asset_url = _get_asset_url(res, auth_token, ttl) + cloned_options = _build_cloned_options(res, overwrite, async_, notification_url, copy_fields) + + return cloned_options, asset_url + + +def _get_asset_url(res, auth_token, ttl): + if not (isinstance(res.get('access_control'), list) and + len(res.get('access_control')) > 0 and + isinstance(res['access_control'][0], dict) and + res['access_control'][0].get("access_type") == "token"): + return res.get('secure_url') + + reso_type = res.get('resource_type') + del_type = res.get('type') + pub_id = res.get('public_id') + file_format = res.get('format') + + if auth_token: + # Raw assets already have the format in the public_id + pub_id_format = pub_id if reso_type == "raw" else f"{pub_id}.{file_format}" + return cloudinary.utils.cloudinary_url( + pub_id_format, + type=del_type, + resource_type=reso_type, + auth_token={"duration": ttl}, + secure=True, + sign_url=True + ) + + # Use private url if no auth_token provided + return cloudinary.utils.private_download_url( + pub_id, + file_format, + resource_type=reso_type, + type=del_type, + expires_at=int(time.time()) + ttl + ) + + +def _build_cloned_options(res, overwrite, async_, notification_url, copy_fields): + # 1. Start with mandatory options + cloned_options = { + 'overwrite': overwrite, + 'async': async_, + } + + # 2. Copy fields from source asset. Some are standard, others are from user input. + fields_to_copy = {'public_id', 'type', 'resource_type', 'access_control'}.union(copy_fields) + cloned_options.update({field: res.get(field) for field in fields_to_copy}) + + # 3. Handle fields that are added only if they have a truthy value + if res.get('display_name'): + cloned_options['display_name'] = res['display_name'] + + # This is required to put the asset in the correct asset_folder + # when copying from a fixed to DF (dynamic folder) cloud as if + # you just pass a `folder` param to a DF cloud, it will append + # this to the `public_id` and we don't want this. if res.get('folder'): - # This is required to put the asset in the correct asset_folder - # when copying from a fixed to DF (dynamic folder) cloud as if - # you just pass a `folder` param to a DF cloud, it will append - # this to the `public_id` and we don't want this. - cloned_options['asset_folder'] = res.get('folder') + cloned_options['asset_folder'] = res['folder'] elif res.get('asset_folder'): - cloned_options['asset_folder'] = res.get('asset_folder') - if res.get('display_name'): - cloned_options['display_name'] = res.get('display_name') + cloned_options['asset_folder'] = res['asset_folder'] + if notification_url: cloned_options['notification_url'] = notification_url - return cloned_options, asset_url + # 4. Clean up any None values before returning + return {k: v for k, v in cloned_options.items() if v is not None} diff --git a/test/test_modules/test_cli_clone.py b/test/test_modules/test_cli_clone.py new file mode 100644 index 0000000..edc77d7 --- /dev/null +++ b/test/test_modules/test_cli_clone.py @@ -0,0 +1,317 @@ +import unittest +from unittest.mock import patch, MagicMock +import re +import sys + +# Import the modules package, which will load the clone module. +# The 'clone' name in the package is the command object, so we get the module from sys.modules. +import cloudinary_cli.modules +clone_module = sys.modules['cloudinary_cli.modules.clone'] + +from cloudinary_cli.defaults import logger + + +class TestCLIClone(unittest.TestCase): + + def setUp(self): + self.mock_search_result = { + 'resources': [ + { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'tags': ['tag1', 'tag2'], + 'context': {'key': 'value'}, + 'folder': 'test_folder', + 'display_name': 'Test Asset' + } + ] + } + + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_default_expression(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets with empty search expression uses default""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = clone_module.search_assets(force=True, search_exp="") + + # Verify default search expression is used + mock_search.expression.assert_called_with("type:upload OR type:private OR type:authenticated") + self.assertEqual(result, self.mock_search_result) + + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_with_custom_expression(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets appends default types to custom expression""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = clone_module.search_assets(force=True, search_exp="tags:test") + + # Verify custom expression gets default types appended + expected_exp = "tags:test AND (type:upload OR type:private OR type:authenticated)" + mock_search.expression.assert_called_with(expected_exp) + self.assertEqual(result, self.mock_search_result) + + @patch.object(clone_module, 'handle_auto_pagination') + @patch.object(clone_module, 'execute_single_request') + @patch('cloudinary.search.Search') + def test_search_assets_with_allowed_type(self, mock_search_class, mock_execute, mock_pagination): + """Test search_assets accepts allowed types""" + mock_search = MagicMock() + mock_search_class.return_value = mock_search + mock_execute.return_value = self.mock_search_result + mock_pagination.return_value = self.mock_search_result + + result = clone_module.search_assets(force=True, search_exp="type:upload") + + # Verify allowed type is accepted as-is + mock_search.expression.assert_called_with("type:upload") + self.assertEqual(result, self.mock_search_result) + + @patch.object(clone_module, 'logger') + def test_search_assets_with_disallowed_type(self, mock_logger): + """Test search_assets rejects disallowed types""" + result = clone_module.search_assets(force=True, search_exp="type:facebook") + + # Verify error is logged and False is returned + mock_logger.error.assert_called_once() + error_call = mock_logger.error.call_args[0][0] + self.assertIn("Unsupported type(s) in search expression", error_call) + self.assertIn("facebook", error_call) + self.assertEqual(result, False) + + @patch.object(clone_module, 'logger') + def test_search_assets_with_mixed_types(self, mock_logger): + """Test search_assets with mix of allowed and disallowed types""" + result = clone_module.search_assets(force=True, search_exp="type:upload OR type:facebook") + + # Verify error is logged for disallowed type + mock_logger.error.assert_called_once() + error_call = mock_logger.error.call_args[0][0] + self.assertIn("facebook", error_call) + # Verify that only the disallowed type is mentioned in the error part + self.assertIn("Unsupported type(s) in search expression: type:facebook", error_call) + self.assertEqual(result, False) + + def test_search_assets_type_validation_regex(self): + """Test the regex used for type validation""" + # Test various type formats + test_cases = [ + ("type:upload", ["upload"]), + ("type=upload", ["upload"]), + ("type: upload", ["upload"]), # with space + ("type = upload", ["upload"]), # with spaces + ("type:upload OR type:private", ["upload", "private"]), + ("tags:test AND type:authenticated", ["authenticated"]), + ] + + for search_exp, expected_types in test_cases: + with self.subTest(search_exp=search_exp): + found_types = re.findall(r"\btype\s*[:=]\s*\w+", search_exp) + cleaned_types = [''.join(t.split()) for t in found_types] + # Extract just the type names + type_names = [t.split(':')[-1].split('=')[-1] for t in cleaned_types] + self.assertEqual(sorted(type_names), sorted(expected_types)) + + def test_process_metadata_basic(self): + """Test process_metadata with basic asset""" + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg' + } + + options, url = clone_module.process_metadata( + res, overwrite=True, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(url, 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg') + self.assertEqual(options['public_id'], 'sample') + self.assertEqual(options['type'], 'upload') + self.assertEqual(options['resource_type'], 'image') + self.assertEqual(options['overwrite'], True) + self.assertEqual(options['async'], False) + + def test_process_metadata_with_tags_and_context(self): + """Test process_metadata copying tags and context""" + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'tags': ['tag1', 'tag2'], + 'context': {'key': 'value'} + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=True, notification_url='http://webhook.com', + auth_token=None, ttl=3600, copy_fields=['tags', 'context'] + ) + + self.assertEqual(options['tags'], ['tag1', 'tag2']) + self.assertEqual(options['context'], {'key': 'value'}) + self.assertEqual(options['notification_url'], 'http://webhook.com') + self.assertEqual(options['overwrite'], False) + self.assertEqual(options['async'], True) + + def test_process_metadata_with_folder(self): + """Test process_metadata with folder handling""" + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'folder': 'test_folder' + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['asset_folder'], 'test_folder') + + def test_process_metadata_with_asset_folder(self): + """Test process_metadata with asset_folder""" + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'asset_folder': 'asset_folder_test' + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['asset_folder'], 'asset_folder_test') + + def test_process_metadata_with_display_name(self): + """Test process_metadata with display_name""" + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'display_name': 'Test Asset Display Name' + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + self.assertEqual(options['display_name'], 'Test Asset Display Name') + + @patch('time.time') + @patch('cloudinary.utils.private_download_url') + def test_process_metadata_restricted_asset_no_auth_token(self, mock_private_url, mock_time): + """Test process_metadata with restricted asset and no auth token""" + mock_time.return_value = 1000 + mock_private_url.return_value = 'https://api.cloudinary.com/v1_1/demo/image/download?api_key=123456789012345&format=jpg&public_id=sample&signature=abcdef123456789×tamp=1234567890' + + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'access_control': [{'access_type': 'token'}] + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token=None, ttl=3600, copy_fields=[] + ) + + # Should use private download URL + mock_private_url.assert_called_once_with( + 'sample', 'jpg', resource_type='image', type='upload', expires_at=4600 + ) + self.assertEqual(url, 'https://api.cloudinary.com/v1_1/demo/image/download?api_key=123456789012345&format=jpg&public_id=sample&signature=abcdef123456789×tamp=1234567890') + + @patch('cloudinary.utils.cloudinary_url') + def test_process_metadata_restricted_asset_with_auth_token(self, mock_cloudinary_url): + """Test process_metadata with restricted asset and auth token""" + mock_cloudinary_url.return_value = ('https://res.cloudinary.com/demo/image/upload/s--AbCdEfGhI--/sample.jpg', {}) + + res = { + 'public_id': 'sample', + 'type': 'upload', + 'resource_type': 'image', + 'format': 'jpg', + 'secure_url': 'https://res.cloudinary.com/demo/image/upload/v1234567890/sample.jpg', + 'access_control': [{'access_type': 'token'}] + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token={'key': 'value'}, ttl=3600, copy_fields=[] + ) + + # Should use signed URL + mock_cloudinary_url.assert_called_once_with( + 'sample.jpg', + type='upload', + resource_type='image', + auth_token={'duration': 3600}, + secure=True, + sign_url=True + ) + # The current implementation assigns the tuple directly, so we expect the tuple + self.assertEqual(url, ('https://res.cloudinary.com/demo/image/upload/s--AbCdEfGhI--/sample.jpg', {})) + + @patch('cloudinary.utils.cloudinary_url') + def test_process_metadata_restricted_raw_asset_with_auth_token(self, mock_cloudinary_url): + """Test process_metadata with restricted raw asset and auth token""" + mock_cloudinary_url.return_value = ('https://res.cloudinary.com/demo/raw/upload/s--XyZaBcDeF--/sample_document', {}) + + res = { + 'public_id': 'sample_document', + 'type': 'upload', + 'resource_type': 'raw', + 'format': 'pdf', + 'secure_url': 'https://res.cloudinary.com/demo/raw/upload/v1234567890/sample_document.pdf', + 'access_control': [{'access_type': 'token'}] + } + + options, url = clone_module.process_metadata( + res, overwrite=False, async_=False, notification_url=None, + auth_token={'key': 'value'}, ttl=3600, copy_fields=[] + ) + + # For raw assets, should not append format to public_id + mock_cloudinary_url.assert_called_once_with( + 'sample_document', # No .pdf extension for raw assets + type='upload', + resource_type='raw', + auth_token={'duration': 3600}, + secure=True, + sign_url=True + ) + # The current implementation assigns the tuple directly, so we expect the tuple + self.assertEqual(url, ('https://res.cloudinary.com/demo/raw/upload/s--XyZaBcDeF--/sample_document', {})) + + +if __name__ == '__main__': + unittest.main()