diff --git a/docs/source/ext/compatibility.py b/docs/source/ext/compatibility.py index 7633c0c32..ea7e90a6a 100644 --- a/docs/source/ext/compatibility.py +++ b/docs/source/ext/compatibility.py @@ -16,6 +16,7 @@ "distributed": "Distributed Jobs", "cancel": "Cancel Job", "describe": "Describe Job", + "workspaces": "Workspaces / Patching", }, } diff --git a/torchx/cli/test/cmd_run_test.py b/torchx/cli/test/cmd_run_test.py index 1707003d4..babfbc225 100644 --- a/torchx/cli/test/cmd_run_test.py +++ b/torchx/cli/test/cmd_run_test.py @@ -160,7 +160,8 @@ def test_run_dryrun(self, mock_runner_run: MagicMock) -> None: self.cmd_run.run(args) mock_runner_run.assert_not_called() - def test_runopts_not_found(self) -> None: + @patch("torchx.runner.workspaces.WorkspaceRunner._patch_app") + def test_runopts_not_found(self, patch_app: MagicMock) -> None: args = self.parser.parse_args( [ "--dryrun", diff --git a/torchx/runner/workspaces.py b/torchx/runner/workspaces.py index c13054a79..7428a4ca7 100644 --- a/torchx/runner/workspaces.py +++ b/torchx/runner/workspaces.py @@ -103,6 +103,7 @@ def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None: img = images.get(role.image) if not img: img = sched.build_workspace_image(role.image, workspace) + log.info(f"built image {img} from {role.image}") images[role.image] = img role.image = img diff --git a/torchx/schedulers/docker_scheduler.py b/torchx/schedulers/docker_scheduler.py index d303b0a6d..568a954c9 100644 --- a/torchx/schedulers/docker_scheduler.py +++ b/torchx/schedulers/docker_scheduler.py @@ -127,6 +127,7 @@ class DockerScheduler(WorkspaceScheduler): describe: | Partial support. DockerScheduler will return job and replica status but does not provide the complete original AppSpec. + workspaces: true """ def __init__(self, session_name: str) -> None: @@ -416,7 +417,7 @@ def build_workspace_image(self, img: str, workspace: str) -> str: Returns: The new Docker image ID. """ - return _build_container_from_workspace(self._client(), img, workspace) + return build_container_from_workspace(self._client(), img, workspace) def _to_str(a: Union[str, bytes]) -> str: @@ -467,9 +468,13 @@ def _build_context(img: str, workspace: str) -> IO[bytes]: return f -def _build_container_from_workspace( +def build_container_from_workspace( client: "DockerClient", img: str, workspace: str ) -> str: + """ + build_container_from_workspace creates a new Docker container with the + workspace filesystem applied as a layer on top of the provided base image. + """ context = _build_context(img, workspace) try: diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 09a5a02ff..671c189f0 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -39,17 +39,20 @@ import warnings from dataclasses import dataclass from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Tuple import torchx import yaml from torchx.schedulers.api import ( AppDryRunInfo, DescribeAppResponse, - Scheduler, + WorkspaceScheduler, Stream, filter_regex, ) +from torchx.schedulers.docker_scheduler import ( + build_container_from_workspace, +) from torchx.schedulers.ids import make_unique from torchx.specs.api import ( AppDef, @@ -67,6 +70,7 @@ if TYPE_CHECKING: + from docker import DockerClient from kubernetes.client import ApiClient, CustomObjectsApi from kubernetes.client.models import ( # noqa: F401 imported but unused V1Pod, @@ -294,6 +298,7 @@ def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]: @dataclass class KubernetesJob: + images_to_push: Dict[str, Tuple[str, str]] resource: Dict[str, object] def __str__(self) -> str: @@ -303,7 +308,7 @@ def __repr__(self) -> str: return str(self) -class KubernetesScheduler(Scheduler): +class KubernetesScheduler(WorkspaceScheduler): """ KubernetesScheduler is a TorchX scheduling interface to Kubernetes. @@ -347,12 +352,19 @@ class KubernetesScheduler(Scheduler): describe: | Partial support. KubernetesScheduler will return job and replica status but does not provide the complete original AppSpec. + workspaces: true """ - def __init__(self, session_name: str, client: Optional["ApiClient"] = None) -> None: + def __init__( + self, + session_name: str, + client: Optional["ApiClient"] = None, + docker_client: Optional["DockerClient"] = None, + ) -> None: super().__init__("kubernetes", session_name) self._client = client + self.__docker_client = docker_client def _api_client(self) -> "ApiClient": from kubernetes import client, config @@ -374,6 +386,15 @@ def _custom_objects_api(self) -> "CustomObjectsApi": return client.CustomObjectsApi(self._api_client()) + def _docker_client(self) -> "DockerClient": + client = self.__docker_client + if not client: + import docker + + client = docker.from_env() + self.__docker_client = client + return client + def _get_job_name_from_exception(self, e: "ApiException") -> Optional[str]: try: return json.loads(e.body)["details"]["name"] @@ -387,6 +408,22 @@ def schedule(self, dryrun_info: AppDryRunInfo[KubernetesJob]) -> str: cfg = dryrun_info._cfg assert cfg is not None, f"{dryrun_info} missing cfg" namespace = cfg.get("namespace") or "default" + + images_to_push = dryrun_info.request.images_to_push + if len(images_to_push) > 0: + client = self._docker_client() + for local, (repo, tag) in images_to_push.items(): + logger.info(f"pushing image {repo}:{tag}...") + img = client.images.get(local) + img.tag(repo, tag=tag) + for line in client.images.push(repo, tag=tag, stream=True, decode=True): + ERROR_KEY = "error" + if ERROR_KEY in line: + raise RuntimeError( + f"failed to push docker image: {line[ERROR_KEY]}" + ) + logger.info(f"docker: {line}") + resource = dryrun_info.request.resource try: resp = self._custom_objects_api().create_namespaced_custom_object( @@ -413,8 +450,32 @@ def _submit_dryrun( queue = cfg.get("queue") if not isinstance(queue, str): raise TypeError(f"config value 'queue' must be a string, got {queue}") + + # map any local images to the remote image + images_to_push = {} + for role in app.roles: + HASH_PREFIX = "sha256:" + if role.image.startswith(HASH_PREFIX): + image_repo = cfg.get("image_repo") + if not image_repo: + raise KeyError( + f"must specify the image repository via `image_repo` config to be able to upload local image {role.image}" + ) + assert isinstance(image_repo, str), "image_repo must be str" + + image_hash = role.image[len(HASH_PREFIX) :] + remote_image = image_repo + ":" + image_hash + images_to_push[role.image] = ( + image_repo, + image_hash, + ) + role.image = remote_image + resource = app_to_resource(app, queue) - req = KubernetesJob(resource=resource) + req = KubernetesJob( + resource=resource, + images_to_push=images_to_push, + ) info = AppDryRunInfo(req, repr) info._app = app info._cfg = cfg @@ -445,6 +506,11 @@ def run_opts(self) -> runopts: opts.add( "queue", type_=str, help="Volcano queue to schedule job in", required=True ) + opts.add( + "image_repo", + type_=str, + help="The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container", + ) return opts def describe(self, app_id: str) -> Optional[DescribeAppResponse]: @@ -531,6 +597,20 @@ def log_iter( else: return iterator + def build_workspace_image(self, img: str, workspace: str) -> str: + """ + build_workspace_image creates a new image with the files in workspace + overlaid on top of it. + + Args: + img: a Docker image to use as a base + workspace: a fsspec path to a directory with contents to be overlaid + + Returns: + The new Docker image ID. + """ + return build_container_from_workspace(self._docker_client(), img, workspace) + def create_scheduler(session_name: str, **kwargs: Any) -> KubernetesScheduler: return KubernetesScheduler( diff --git a/torchx/schedulers/local_scheduler.py b/torchx/schedulers/local_scheduler.py index ccaabff83..813369eac 100644 --- a/torchx/schedulers/local_scheduler.py +++ b/torchx/schedulers/local_scheduler.py @@ -548,6 +548,9 @@ class LocalScheduler(Scheduler): LocalScheduler supports multiple replicas but all replicas will execute on the local host. describe: true + workspaces: | + Partial support. LocalScheduler runs the app from a local + directory but does not support programmatic workspaces. """ def __init__( diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 01205eb61..695bf9794 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -240,6 +240,11 @@ class SlurmScheduler(Scheduler): describe: | Partial support. SlurmScheduler will return job and replica status but does not provide the complete original AppSpec. + workspaces: | + Partial support. Typical Slurm usage is from a shared NFS mount + so code will automatically be updated on the workers. + SlurmScheduler does not support programmatic patching via + WorkspaceScheduler. """ diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index 33b80f27b..cf7bb9de9 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -10,19 +10,30 @@ from datetime import datetime from unittest.mock import MagicMock, patch +import fsspec import torchx from torchx import schedulers, specs # @manual=//torchx/schedulers:kubernetes_scheduler from torchx.schedulers import kubernetes_scheduler -from torchx.schedulers.api import DescribeAppResponse +from torchx.schedulers.api import ( + DescribeAppResponse, + AppDryRunInfo, +) +from torchx.schedulers.docker_scheduler import ( + has_docker, +) from torchx.schedulers.kubernetes_scheduler import ( app_to_resource, cleanup_str, create_scheduler, role_to_pod, + KubernetesScheduler, + KubernetesJob, ) +SKIP_DOCKER: bool = not has_docker() + def _test_app() -> specs.AppDef: trainer_role = specs.Role( @@ -222,6 +233,31 @@ def test_submit_dryrun(self) -> None: """, ) + def test_submit_dryrun_patch(self) -> None: + scheduler = create_scheduler("test") + app = _test_app() + app.roles[0].image = "sha256:testhash" + cfg = { + "queue": "testqueue", + "image_repo": "example.com/some/repo", + } + with patch( + "torchx.schedulers.kubernetes_scheduler.make_unique" + ) as make_unique_ctx: + make_unique_ctx.return_value = "app-name-42" + info = scheduler._submit_dryrun(app, cfg) + + self.assertIn("example.com/some/repo:testhash", str(info.request.resource)) + self.assertEqual( + info.request.images_to_push, + { + "sha256:testhash": ( + "example.com/some/repo", + "testhash", + ), + }, + ) + @patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object") def test_submit(self, create_namespaced_custom_object: MagicMock) -> None: create_namespaced_custom_object.return_value = { @@ -252,7 +288,7 @@ def test_submit_job_name_conflict( from kubernetes.client.rest import ApiException api_exc = ApiException(status=409, reason="Conflict") - api_exc.body = "{'details':{'name': 'test_job'}}" + api_exc.body = '{"details":{"name": "test_job"}}' create_namespaced_custom_object.side_effect = api_exc scheduler = create_scheduler("test") @@ -344,7 +380,14 @@ def test_describe_unknown( def test_runopts(self) -> None: scheduler = kubernetes_scheduler.create_scheduler("foo") runopts = scheduler.run_opts() - self.assertEqual(set(runopts._opts.keys()), {"queue", "namespace"}) + self.assertEqual( + set(runopts._opts.keys()), + { + "queue", + "namespace", + "image_repo", + }, + ) @patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object") def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> None: @@ -394,6 +437,49 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None: }, ) + def test_build_workspace_image(self) -> None: + img = MagicMock() + img.id = "testimage" + client = MagicMock() + client.images.build.return_value = (img, []) + scheduler = KubernetesScheduler( + "foo", + docker_client=client, + ) + + fs = fsspec.filesystem("memory") + fs.mkdirs("test_workspace/bar", exist_ok=True) + with fs.open("test_workspace/bar/foo.sh", "w") as f: + f.write("exit 0") + + img = scheduler.build_workspace_image( + "busybox", + "memory://test_workspace", + ) + self.assertEqual(img, "testimage") + + def test_push_patches(self) -> None: + client = MagicMock() + scheduler = KubernetesScheduler( + "foo", + client=MagicMock(), + docker_client=client, + ) + + job = KubernetesJob( + images_to_push={ + "sha256:testimage": ("repo.com/img", "testimage"), + }, + resource={}, + ) + + out = scheduler.schedule(AppDryRunInfo(job, repr)) + self.assertTrue(out) + + self.assertEqual(client.images.get.call_count, 1) + self.assertEqual(client.images.get().tag.call_count, 1) + self.assertEqual(client.images.push.call_count, 1) + class KubernetesSchedulerNoImportTest(unittest.TestCase): """