Skip to content

Commit 2d02dff

Browse files
d4l3kfacebook-github-bot
authored andcommitted
schedulers/kubernetes_scheduler: add workspace/patching support (#384)
Summary: This adds patching support to the kubernetes scheduler. It requires you to specify `image_repo` as a config option with the docker repository to push to. If the dryrun/schedule methods find a local image such as `sha256:...` it'll remap it to a remote repo package and push it during schedule. Pull Request resolved: #384 Test Plan: ``` pyre pytest torchx/schedulers/test/kubernetes_scheduler_test.py ``` ``` (torchx) tristanr@tristanr-arch2 ~/D/torchx-proj> torchx run --scheduler kubernetes -c queue=default,image_repo=495572122715.dkr.ecr.us-west-2.amazonaws.com/torchx/integration-tests --wait --log utils.sh sh foo.sh torchx 2022-02-09 15:51:12 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2022-02-09 15:51:12 INFO building patch images for workspace: file:///home/tristanr/Developer/torchx-proj... torchx 2022-02-09 15:51:13 INFO built image sha256:d1cd394f88861a5ca18de88cc0801513cd6c3dc7d945f7cbfe7121bb1d552bec from ghcr.io/pytorch/torchx:0.1.2dev0 torchx 2022-02-09 15:51:14 INFO pushing image 495572122715.dkr.ecr.us-west-2.amazonaws.com/torchx/integration-tests:d1cd394f88861a5ca18de88cc0801513cd6c3dc7d945f7cbfe7121bb1d552bec... torchx 2022-02-09 15:51:14 INFO docker: {'status': 'The push refers to repository [495572122715.dkr.ecr.us-west-2.amazonaws.com/torchx/integration-tests]'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '004e5e059580'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': 'de1d3a8ac491'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': 'e6d41c036803'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '0827b8e37332'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': 'a8496aa14f72'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '1f84c52a7d38'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '0f801b69538d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '354dfcbe6a14'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': 'f15a0881ce19'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Preparing', 'progressDetail': {}, 'id': '824bf068fd3d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Waiting', 'progressDetail': {}, 'id': '1f84c52a7d38'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Waiting', 'progressDetail': {}, 'id': '824bf068fd3d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Waiting', 'progressDetail': {}, 'id': 'f15a0881ce19'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Waiting', 'progressDetail': {}, 'id': '0f801b69538d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Waiting', 'progressDetail': {}, 'id': '354dfcbe6a14'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '0827b8e37332'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': 'de1d3a8ac491'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': 'e6d41c036803'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '004e5e059580'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': 'a8496aa14f72'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '824bf068fd3d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '0f801b69538d'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '1f84c52a7d38'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': 'f15a0881ce19'} torchx 2022-02-09 15:51:15 INFO docker: {'status': 'Layer already exists', 'progressDetail': {}, 'id': '354dfcbe6a14'} torchx 2022-02-09 15:51:16 INFO docker: {'status': 'd1cd394f88861a5ca18de88cc0801513cd6c3dc7d945f7cbfe7121bb1d552bec: digest: sha256:da9fba179cb37f2f6d6d09c16dc4f0c39ca84a6fbb767c0aff7b77738b608805 size: 2413'} torchx 2022-02-09 15:51:16 INFO docker: {'progressDetail': {}, 'aux': {'Tag': 'd1cd394f88861a5ca18de88cc0801513cd6c3dc7d945f7cbfe7121bb1d552bec', 'Digest': 'sha256:da9fba179cb37f2f6d6d09c16dc4f0c39ca84a6fbb767c0aff7b77738b608805', 'Size': 2413}} kubernetes://torchx/default:sh-n71zqm25lrk61 torchx 2022-02-09 15:51:17 INFO Launched app: kubernetes://torchx/default:sh-n71zqm25lrk61 torchx 2022-02-09 15:51:17 INFO AppStatus: msg: <NONE> num_restarts: -1 roles: [] state: PENDING (2) structured_error_msg: <NONE> ui_url: null torchx 2022-02-09 15:51:17 INFO Job URL: None torchx 2022-02-09 15:51:17 INFO Waiting for the app to finish... torchx 2022-02-09 15:51:17 INFO Waiting for app to start before logging... torchx 2022-02-09 15:51:22 INFO Job finished: SUCCEEDED sh/0 2022-02-09T23:51:21.702981500Z foo ``` Reviewed By: kiukchung Differential Revision: D34125887 Pulled By: d4l3k fbshipit-source-id: 2d47b678514ad645e11116b34533e17d953d0ddd
1 parent 6a50643 commit 2d02dff

File tree

8 files changed

+193
-11
lines changed

8 files changed

+193
-11
lines changed

docs/source/ext/compatibility.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"distributed": "Distributed Jobs",
1717
"cancel": "Cancel Job",
1818
"describe": "Describe Job",
19+
"workspaces": "Workspaces / Patching",
1920
},
2021
}
2122

torchx/cli/test/cmd_run_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ def test_run_dryrun(self, mock_runner_run: MagicMock) -> None:
160160
self.cmd_run.run(args)
161161
mock_runner_run.assert_not_called()
162162

163-
def test_runopts_not_found(self) -> None:
163+
@patch("torchx.runner.workspaces.WorkspaceRunner._patch_app")
164+
def test_runopts_not_found(self, patch_app: MagicMock) -> None:
164165
args = self.parser.parse_args(
165166
[
166167
"--dryrun",

torchx/runner/workspaces.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None:
103103
img = images.get(role.image)
104104
if not img:
105105
img = sched.build_workspace_image(role.image, workspace)
106+
log.info(f"built image {img} from {role.image}")
106107
images[role.image] = img
107108
role.image = img
108109

torchx/schedulers/docker_scheduler.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class DockerScheduler(WorkspaceScheduler):
127127
describe: |
128128
Partial support. DockerScheduler will return job and replica
129129
status but does not provide the complete original AppSpec.
130+
workspaces: true
130131
"""
131132

132133
def __init__(self, session_name: str) -> None:
@@ -416,7 +417,7 @@ def build_workspace_image(self, img: str, workspace: str) -> str:
416417
Returns:
417418
The new Docker image ID.
418419
"""
419-
return _build_container_from_workspace(self._client(), img, workspace)
420+
return build_container_from_workspace(self._client(), img, workspace)
420421

421422

422423
def _to_str(a: Union[str, bytes]) -> str:
@@ -467,9 +468,13 @@ def _build_context(img: str, workspace: str) -> IO[bytes]:
467468
return f
468469

469470

470-
def _build_container_from_workspace(
471+
def build_container_from_workspace(
471472
client: "DockerClient", img: str, workspace: str
472473
) -> str:
474+
"""
475+
build_container_from_workspace creates a new Docker container with the
476+
workspace filesystem applied as a layer on top of the provided base image.
477+
"""
473478
context = _build_context(img, workspace)
474479

475480
try:

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,20 @@
3939
import warnings
4040
from dataclasses import dataclass
4141
from datetime import datetime
42-
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional
42+
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Tuple
4343

4444
import torchx
4545
import yaml
4646
from torchx.schedulers.api import (
4747
AppDryRunInfo,
4848
DescribeAppResponse,
49-
Scheduler,
49+
WorkspaceScheduler,
5050
Stream,
5151
filter_regex,
5252
)
53+
from torchx.schedulers.docker_scheduler import (
54+
build_container_from_workspace,
55+
)
5356
from torchx.schedulers.ids import make_unique
5457
from torchx.specs.api import (
5558
AppDef,
@@ -67,6 +70,7 @@
6770

6871

6972
if TYPE_CHECKING:
73+
from docker import DockerClient
7074
from kubernetes.client import ApiClient, CustomObjectsApi
7175
from kubernetes.client.models import ( # noqa: F401 imported but unused
7276
V1Pod,
@@ -294,6 +298,7 @@ def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:
294298

295299
@dataclass
296300
class KubernetesJob:
301+
images_to_push: Dict[str, Tuple[str, str]]
297302
resource: Dict[str, object]
298303

299304
def __str__(self) -> str:
@@ -303,7 +308,7 @@ def __repr__(self) -> str:
303308
return str(self)
304309

305310

306-
class KubernetesScheduler(Scheduler):
311+
class KubernetesScheduler(WorkspaceScheduler):
307312
"""
308313
KubernetesScheduler is a TorchX scheduling interface to Kubernetes.
309314
@@ -347,12 +352,19 @@ class KubernetesScheduler(Scheduler):
347352
describe: |
348353
Partial support. KubernetesScheduler will return job and replica
349354
status but does not provide the complete original AppSpec.
355+
workspaces: true
350356
"""
351357

352-
def __init__(self, session_name: str, client: Optional["ApiClient"] = None) -> None:
358+
def __init__(
359+
self,
360+
session_name: str,
361+
client: Optional["ApiClient"] = None,
362+
docker_client: Optional["DockerClient"] = None,
363+
) -> None:
353364
super().__init__("kubernetes", session_name)
354365

355366
self._client = client
367+
self.__docker_client = docker_client
356368

357369
def _api_client(self) -> "ApiClient":
358370
from kubernetes import client, config
@@ -374,6 +386,15 @@ def _custom_objects_api(self) -> "CustomObjectsApi":
374386

375387
return client.CustomObjectsApi(self._api_client())
376388

389+
def _docker_client(self) -> "DockerClient":
390+
client = self.__docker_client
391+
if not client:
392+
import docker
393+
394+
client = docker.from_env()
395+
self.__docker_client = client
396+
return client
397+
377398
def _get_job_name_from_exception(self, e: "ApiException") -> Optional[str]:
378399
try:
379400
return json.loads(e.body)["details"]["name"]
@@ -387,6 +408,22 @@ def schedule(self, dryrun_info: AppDryRunInfo[KubernetesJob]) -> str:
387408
cfg = dryrun_info._cfg
388409
assert cfg is not None, f"{dryrun_info} missing cfg"
389410
namespace = cfg.get("namespace") or "default"
411+
412+
images_to_push = dryrun_info.request.images_to_push
413+
if len(images_to_push) > 0:
414+
client = self._docker_client()
415+
for local, (repo, tag) in images_to_push.items():
416+
logger.info(f"pushing image {repo}:{tag}...")
417+
img = client.images.get(local)
418+
img.tag(repo, tag=tag)
419+
for line in client.images.push(repo, tag=tag, stream=True, decode=True):
420+
ERROR_KEY = "error"
421+
if ERROR_KEY in line:
422+
raise RuntimeError(
423+
f"failed to push docker image: {line[ERROR_KEY]}"
424+
)
425+
logger.info(f"docker: {line}")
426+
390427
resource = dryrun_info.request.resource
391428
try:
392429
resp = self._custom_objects_api().create_namespaced_custom_object(
@@ -413,8 +450,32 @@ def _submit_dryrun(
413450
queue = cfg.get("queue")
414451
if not isinstance(queue, str):
415452
raise TypeError(f"config value 'queue' must be a string, got {queue}")
453+
454+
# map any local images to the remote image
455+
images_to_push = {}
456+
for role in app.roles:
457+
HASH_PREFIX = "sha256:"
458+
if role.image.startswith(HASH_PREFIX):
459+
image_repo = cfg.get("image_repo")
460+
if not image_repo:
461+
raise KeyError(
462+
f"must specify the image repository via `image_repo` config to be able to upload local image {role.image}"
463+
)
464+
assert isinstance(image_repo, str), "image_repo must be str"
465+
466+
image_hash = role.image[len(HASH_PREFIX) :]
467+
remote_image = image_repo + ":" + image_hash
468+
images_to_push[role.image] = (
469+
image_repo,
470+
image_hash,
471+
)
472+
role.image = remote_image
473+
416474
resource = app_to_resource(app, queue)
417-
req = KubernetesJob(resource=resource)
475+
req = KubernetesJob(
476+
resource=resource,
477+
images_to_push=images_to_push,
478+
)
418479
info = AppDryRunInfo(req, repr)
419480
info._app = app
420481
info._cfg = cfg
@@ -445,6 +506,11 @@ def run_opts(self) -> runopts:
445506
opts.add(
446507
"queue", type_=str, help="Volcano queue to schedule job in", required=True
447508
)
509+
opts.add(
510+
"image_repo",
511+
type_=str,
512+
help="The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container",
513+
)
448514
return opts
449515

450516
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
@@ -531,6 +597,20 @@ def log_iter(
531597
else:
532598
return iterator
533599

600+
def build_workspace_image(self, img: str, workspace: str) -> str:
601+
"""
602+
build_workspace_image creates a new image with the files in workspace
603+
overlaid on top of it.
604+
605+
Args:
606+
img: a Docker image to use as a base
607+
workspace: a fsspec path to a directory with contents to be overlaid
608+
609+
Returns:
610+
The new Docker image ID.
611+
"""
612+
return build_container_from_workspace(self._docker_client(), img, workspace)
613+
534614

535615
def create_scheduler(session_name: str, **kwargs: Any) -> KubernetesScheduler:
536616
return KubernetesScheduler(

torchx/schedulers/local_scheduler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,9 @@ class LocalScheduler(Scheduler):
548548
LocalScheduler supports multiple replicas but all replicas will
549549
execute on the local host.
550550
describe: true
551+
workspaces: |
552+
Partial support. LocalScheduler runs the app from a local
553+
directory but does not support programmatic workspaces.
551554
"""
552555

553556
def __init__(

torchx/schedulers/slurm_scheduler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class SlurmScheduler(Scheduler):
240240
describe: |
241241
Partial support. SlurmScheduler will return job and replica
242242
status but does not provide the complete original AppSpec.
243+
workspaces: |
244+
Partial support. Typical Slurm usage is from a shared NFS mount
245+
so code will automatically be updated on the workers.
246+
SlurmScheduler does not support programmatic patching via
247+
WorkspaceScheduler.
243248
244249
"""
245250

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,30 @@
1010
from datetime import datetime
1111
from unittest.mock import MagicMock, patch
1212

13+
import fsspec
1314
import torchx
1415
from torchx import schedulers, specs
1516

1617
# @manual=//torchx/schedulers:kubernetes_scheduler
1718
from torchx.schedulers import kubernetes_scheduler
18-
from torchx.schedulers.api import DescribeAppResponse
19+
from torchx.schedulers.api import (
20+
DescribeAppResponse,
21+
AppDryRunInfo,
22+
)
23+
from torchx.schedulers.docker_scheduler import (
24+
has_docker,
25+
)
1926
from torchx.schedulers.kubernetes_scheduler import (
2027
app_to_resource,
2128
cleanup_str,
2229
create_scheduler,
2330
role_to_pod,
31+
KubernetesScheduler,
32+
KubernetesJob,
2433
)
2534

35+
SKIP_DOCKER: bool = not has_docker()
36+
2637

2738
def _test_app() -> specs.AppDef:
2839
trainer_role = specs.Role(
@@ -222,6 +233,31 @@ def test_submit_dryrun(self) -> None:
222233
""",
223234
)
224235

236+
def test_submit_dryrun_patch(self) -> None:
237+
scheduler = create_scheduler("test")
238+
app = _test_app()
239+
app.roles[0].image = "sha256:testhash"
240+
cfg = {
241+
"queue": "testqueue",
242+
"image_repo": "example.com/some/repo",
243+
}
244+
with patch(
245+
"torchx.schedulers.kubernetes_scheduler.make_unique"
246+
) as make_unique_ctx:
247+
make_unique_ctx.return_value = "app-name-42"
248+
info = scheduler._submit_dryrun(app, cfg)
249+
250+
self.assertIn("example.com/some/repo:testhash", str(info.request.resource))
251+
self.assertEqual(
252+
info.request.images_to_push,
253+
{
254+
"sha256:testhash": (
255+
"example.com/some/repo",
256+
"testhash",
257+
),
258+
},
259+
)
260+
225261
@patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
226262
def test_submit(self, create_namespaced_custom_object: MagicMock) -> None:
227263
create_namespaced_custom_object.return_value = {
@@ -252,7 +288,7 @@ def test_submit_job_name_conflict(
252288
from kubernetes.client.rest import ApiException
253289

254290
api_exc = ApiException(status=409, reason="Conflict")
255-
api_exc.body = "{'details':{'name': 'test_job'}}"
291+
api_exc.body = '{"details":{"name": "test_job"}}'
256292
create_namespaced_custom_object.side_effect = api_exc
257293

258294
scheduler = create_scheduler("test")
@@ -344,7 +380,14 @@ def test_describe_unknown(
344380
def test_runopts(self) -> None:
345381
scheduler = kubernetes_scheduler.create_scheduler("foo")
346382
runopts = scheduler.run_opts()
347-
self.assertEqual(set(runopts._opts.keys()), {"queue", "namespace"})
383+
self.assertEqual(
384+
set(runopts._opts.keys()),
385+
{
386+
"queue",
387+
"namespace",
388+
"image_repo",
389+
},
390+
)
348391

349392
@patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
350393
def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> None:
@@ -394,6 +437,49 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
394437
},
395438
)
396439

440+
def test_build_workspace_image(self) -> None:
441+
img = MagicMock()
442+
img.id = "testimage"
443+
client = MagicMock()
444+
client.images.build.return_value = (img, [])
445+
scheduler = KubernetesScheduler(
446+
"foo",
447+
docker_client=client,
448+
)
449+
450+
fs = fsspec.filesystem("memory")
451+
fs.mkdirs("test_workspace/bar", exist_ok=True)
452+
with fs.open("test_workspace/bar/foo.sh", "w") as f:
453+
f.write("exit 0")
454+
455+
img = scheduler.build_workspace_image(
456+
"busybox",
457+
"memory://test_workspace",
458+
)
459+
self.assertEqual(img, "testimage")
460+
461+
def test_push_patches(self) -> None:
462+
client = MagicMock()
463+
scheduler = KubernetesScheduler(
464+
"foo",
465+
client=MagicMock(),
466+
docker_client=client,
467+
)
468+
469+
job = KubernetesJob(
470+
images_to_push={
471+
"sha256:testimage": ("repo.com/img", "testimage"),
472+
},
473+
resource={},
474+
)
475+
476+
out = scheduler.schedule(AppDryRunInfo(job, repr))
477+
self.assertTrue(out)
478+
479+
self.assertEqual(client.images.get.call_count, 1)
480+
self.assertEqual(client.images.get().tag.call_count, 1)
481+
self.assertEqual(client.images.push.call_count, 1)
482+
397483

398484
class KubernetesSchedulerNoImportTest(unittest.TestCase):
399485
"""

0 commit comments

Comments
 (0)