Skip to content

schedulers/kubernetes_scheduler: add workspace/patching support #384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/ext/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"distributed": "Distributed Jobs",
"cancel": "Cancel Job",
"describe": "Describe Job",
"workspaces": "Workspaces / Patching",
},
}

Expand Down
3 changes: 2 additions & 1 deletion torchx/cli/test/cmd_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def test_run_dryrun(self, mock_runner_run: MagicMock) -> None:
self.cmd_run.run(args)
mock_runner_run.assert_not_called()

def test_runopts_not_found(self) -> None:
@patch("torchx.runner.workspaces.WorkspaceRunner._patch_app")
def test_runopts_not_found(self, patch_app: MagicMock) -> None:
args = self.parser.parse_args(
[
"--dryrun",
Expand Down
1 change: 1 addition & 0 deletions torchx/runner/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None:
img = images.get(role.image)
if not img:
img = sched.build_workspace_image(role.image, workspace)
log.info(f"built image {img} from {role.image}")
images[role.image] = img
role.image = img

Expand Down
9 changes: 7 additions & 2 deletions torchx/schedulers/docker_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class DockerScheduler(WorkspaceScheduler):
describe: |
Partial support. DockerScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: true
"""

def __init__(self, session_name: str) -> None:
Expand Down Expand Up @@ -416,7 +417,7 @@ def build_workspace_image(self, img: str, workspace: str) -> str:
Returns:
The new Docker image ID.
"""
return _build_container_from_workspace(self._client(), img, workspace)
return build_container_from_workspace(self._client(), img, workspace)


def _to_str(a: Union[str, bytes]) -> str:
Expand Down Expand Up @@ -467,9 +468,13 @@ def _build_context(img: str, workspace: str) -> IO[bytes]:
return f


def _build_container_from_workspace(
def build_container_from_workspace(
client: "DockerClient", img: str, workspace: str
) -> str:
"""
build_container_from_workspace creates a new Docker container with the
workspace filesystem applied as a layer on top of the provided base image.
"""
context = _build_context(img, workspace)

try:
Expand Down
90 changes: 85 additions & 5 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Tuple

import torchx
import yaml
from torchx.schedulers.api import (
AppDryRunInfo,
DescribeAppResponse,
Scheduler,
WorkspaceScheduler,
Stream,
filter_regex,
)
from torchx.schedulers.docker_scheduler import (
build_container_from_workspace,
)
from torchx.schedulers.ids import make_unique
from torchx.specs.api import (
AppDef,
Expand All @@ -67,6 +70,7 @@


if TYPE_CHECKING:
from docker import DockerClient
from kubernetes.client import ApiClient, CustomObjectsApi
from kubernetes.client.models import ( # noqa: F401 imported but unused
V1Pod,
Expand Down Expand Up @@ -294,6 +298,7 @@ def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:

@dataclass
class KubernetesJob:
images_to_push: Dict[str, Tuple[str, str]]
resource: Dict[str, object]

def __str__(self) -> str:
Expand All @@ -303,7 +308,7 @@ def __repr__(self) -> str:
return str(self)


class KubernetesScheduler(Scheduler):
class KubernetesScheduler(WorkspaceScheduler):
"""
KubernetesScheduler is a TorchX scheduling interface to Kubernetes.

Expand Down Expand Up @@ -347,12 +352,19 @@ class KubernetesScheduler(Scheduler):
describe: |
Partial support. KubernetesScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: true
"""

def __init__(self, session_name: str, client: Optional["ApiClient"] = None) -> None:
def __init__(
self,
session_name: str,
client: Optional["ApiClient"] = None,
docker_client: Optional["DockerClient"] = None,
) -> None:
super().__init__("kubernetes", session_name)

self._client = client
self.__docker_client = docker_client

def _api_client(self) -> "ApiClient":
from kubernetes import client, config
Expand All @@ -374,6 +386,15 @@ def _custom_objects_api(self) -> "CustomObjectsApi":

return client.CustomObjectsApi(self._api_client())

def _docker_client(self) -> "DockerClient":
client = self.__docker_client
if not client:
import docker

client = docker.from_env()
self.__docker_client = client
return client

def _get_job_name_from_exception(self, e: "ApiException") -> Optional[str]:
try:
return json.loads(e.body)["details"]["name"]
Expand All @@ -387,6 +408,22 @@ def schedule(self, dryrun_info: AppDryRunInfo[KubernetesJob]) -> str:
cfg = dryrun_info._cfg
assert cfg is not None, f"{dryrun_info} missing cfg"
namespace = cfg.get("namespace") or "default"

images_to_push = dryrun_info.request.images_to_push
if len(images_to_push) > 0:
client = self._docker_client()
for local, (repo, tag) in images_to_push.items():
logger.info(f"pushing image {repo}:{tag}...")
img = client.images.get(local)
img.tag(repo, tag=tag)
for line in client.images.push(repo, tag=tag, stream=True, decode=True):
ERROR_KEY = "error"
if ERROR_KEY in line:
raise RuntimeError(
f"failed to push docker image: {line[ERROR_KEY]}"
)
logger.info(f"docker: {line}")

resource = dryrun_info.request.resource
try:
resp = self._custom_objects_api().create_namespaced_custom_object(
Expand All @@ -413,8 +450,32 @@ def _submit_dryrun(
queue = cfg.get("queue")
if not isinstance(queue, str):
raise TypeError(f"config value 'queue' must be a string, got {queue}")

# map any local images to the remote image
images_to_push = {}
for role in app.roles:
HASH_PREFIX = "sha256:"
if role.image.startswith(HASH_PREFIX):
image_repo = cfg.get("image_repo")
if not image_repo:
raise KeyError(
f"must specify the image repository via `image_repo` config to be able to upload local image {role.image}"
)
assert isinstance(image_repo, str), "image_repo must be str"

image_hash = role.image[len(HASH_PREFIX) :]
remote_image = image_repo + ":" + image_hash
images_to_push[role.image] = (
image_repo,
image_hash,
)
role.image = remote_image

resource = app_to_resource(app, queue)
req = KubernetesJob(resource=resource)
req = KubernetesJob(
resource=resource,
images_to_push=images_to_push,
)
info = AppDryRunInfo(req, repr)
info._app = app
info._cfg = cfg
Expand Down Expand Up @@ -445,6 +506,11 @@ def run_opts(self) -> runopts:
opts.add(
"queue", type_=str, help="Volcano queue to schedule job in", required=True
)
opts.add(
"image_repo",
type_=str,
help="The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container",
)
return opts

def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
Expand Down Expand Up @@ -531,6 +597,20 @@ def log_iter(
else:
return iterator

def build_workspace_image(self, img: str, workspace: str) -> str:
"""
build_workspace_image creates a new image with the files in workspace
overlaid on top of it.

Args:
img: a Docker image to use as a base
workspace: a fsspec path to a directory with contents to be overlaid

Returns:
The new Docker image ID.
"""
return build_container_from_workspace(self._docker_client(), img, workspace)


def create_scheduler(session_name: str, **kwargs: Any) -> KubernetesScheduler:
return KubernetesScheduler(
Expand Down
3 changes: 3 additions & 0 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ class LocalScheduler(Scheduler):
LocalScheduler supports multiple replicas but all replicas will
execute on the local host.
describe: true
workspaces: |
Partial support. LocalScheduler runs the app from a local
directory but does not support programmatic workspaces.
"""

def __init__(
Expand Down
5 changes: 5 additions & 0 deletions torchx/schedulers/slurm_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ class SlurmScheduler(Scheduler):
describe: |
Partial support. SlurmScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: |
Partial support. Typical Slurm usage is from a shared NFS mount
so code will automatically be updated on the workers.
SlurmScheduler does not support programmatic patching via
WorkspaceScheduler.

"""

Expand Down
92 changes: 89 additions & 3 deletions torchx/schedulers/test/kubernetes_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,30 @@
from datetime import datetime
from unittest.mock import MagicMock, patch

import fsspec
import torchx
from torchx import schedulers, specs

# @manual=//torchx/schedulers:kubernetes_scheduler
from torchx.schedulers import kubernetes_scheduler
from torchx.schedulers.api import DescribeAppResponse
from torchx.schedulers.api import (
DescribeAppResponse,
AppDryRunInfo,
)
from torchx.schedulers.docker_scheduler import (
has_docker,
)
from torchx.schedulers.kubernetes_scheduler import (
app_to_resource,
cleanup_str,
create_scheduler,
role_to_pod,
KubernetesScheduler,
KubernetesJob,
)

SKIP_DOCKER: bool = not has_docker()


def _test_app() -> specs.AppDef:
trainer_role = specs.Role(
Expand Down Expand Up @@ -222,6 +233,31 @@ def test_submit_dryrun(self) -> None:
""",
)

def test_submit_dryrun_patch(self) -> None:
scheduler = create_scheduler("test")
app = _test_app()
app.roles[0].image = "sha256:testhash"
cfg = {
"queue": "testqueue",
"image_repo": "example.com/some/repo",
}
with patch(
"torchx.schedulers.kubernetes_scheduler.make_unique"
) as make_unique_ctx:
make_unique_ctx.return_value = "app-name-42"
info = scheduler._submit_dryrun(app, cfg)

self.assertIn("example.com/some/repo:testhash", str(info.request.resource))
self.assertEqual(
info.request.images_to_push,
{
"sha256:testhash": (
"example.com/some/repo",
"testhash",
),
},
)

@patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
def test_submit(self, create_namespaced_custom_object: MagicMock) -> None:
create_namespaced_custom_object.return_value = {
Expand Down Expand Up @@ -252,7 +288,7 @@ def test_submit_job_name_conflict(
from kubernetes.client.rest import ApiException

api_exc = ApiException(status=409, reason="Conflict")
api_exc.body = "{'details':{'name': 'test_job'}}"
api_exc.body = '{"details":{"name": "test_job"}}'
create_namespaced_custom_object.side_effect = api_exc

scheduler = create_scheduler("test")
Expand Down Expand Up @@ -344,7 +380,14 @@ def test_describe_unknown(
def test_runopts(self) -> None:
scheduler = kubernetes_scheduler.create_scheduler("foo")
runopts = scheduler.run_opts()
self.assertEqual(set(runopts._opts.keys()), {"queue", "namespace"})
self.assertEqual(
set(runopts._opts.keys()),
{
"queue",
"namespace",
"image_repo",
},
)

@patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> None:
Expand Down Expand Up @@ -394,6 +437,49 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
},
)

def test_build_workspace_image(self) -> None:
img = MagicMock()
img.id = "testimage"
client = MagicMock()
client.images.build.return_value = (img, [])
scheduler = KubernetesScheduler(
"foo",
docker_client=client,
)

fs = fsspec.filesystem("memory")
fs.mkdirs("test_workspace/bar", exist_ok=True)
with fs.open("test_workspace/bar/foo.sh", "w") as f:
f.write("exit 0")

img = scheduler.build_workspace_image(
"busybox",
"memory://test_workspace",
)
self.assertEqual(img, "testimage")

def test_push_patches(self) -> None:
client = MagicMock()
scheduler = KubernetesScheduler(
"foo",
client=MagicMock(),
docker_client=client,
)

job = KubernetesJob(
images_to_push={
"sha256:testimage": ("repo.com/img", "testimage"),
},
resource={},
)

out = scheduler.schedule(AppDryRunInfo(job, repr))
self.assertTrue(out)

self.assertEqual(client.images.get.call_count, 1)
self.assertEqual(client.images.get().tag.call_count, 1)
self.assertEqual(client.images.push.call_count, 1)


class KubernetesSchedulerNoImportTest(unittest.TestCase):
"""
Expand Down