diff --git a/scripts/awsbatchint.sh b/scripts/awsbatchint.sh index 476b9b61d..6dd0160ab 100755 --- a/scripts/awsbatchint.sh +++ b/scripts/awsbatchint.sh @@ -7,7 +7,28 @@ set -ex -RUN_ARGS="--scheduler aws_batch -c queue=torchx,image_repo=495572122715.dkr.ecr.us-west-2.amazonaws.com/torchx/integration-tests utils.echo" +JOB="$USER-$(uuidgen)" +DIR="/tmp/$JOB" + +function cleanup { + rm -r "$DIR" +} +trap cleanup EXIT + +mkdir "$DIR" +cd "$DIR" + +cat < .torchxconfig +[aws_batch] +queue=torchx +image_repo=495572122715.dkr.ecr.us-west-2.amazonaws.com/torchx/integration-tests +EOT + +cat < main.py +print("hello world!") +EOT + +RUN_ARGS="--scheduler aws_batch dist.ddp -j 2x1 --script main.py" if [ -z "$AWS_ROLE_ARN" ]; then # only dryrun if no secrets @@ -19,11 +40,11 @@ else torchx status "$APP_ID" torchx describe "$APP_ID" torchx log "$APP_ID" - LINES="$(torchx log "$APP_ID" | wc -l)" + LINES="$(torchx log "$APP_ID" | grep -c 'hello world')" - if [ "$LINES" -ne 1 ] + if [ "$LINES" -ne 2 ] then - echo "expected 1 log lines" + echo "expected 2 log lines" exit 1 fi fi diff --git a/scripts/slurmint.sh b/scripts/slurmint.sh index 40cb65a88..4d0c013c9 100755 --- a/scripts/slurmint.sh +++ b/scripts/slurmint.sh @@ -20,7 +20,7 @@ if [[ -z "${SLURM_INSTANCE_MASTER}" ]]; then fi JOB="$USER-$(uuidgen)" -DIR="/tmp/$JOB" +DIR="/home/ubuntu/integ-tests/$JOB" VENV="$DIR/venv" function run_cmd { @@ -44,8 +44,8 @@ REMOTE_WHEEL="$DIR/$(basename "$WHEEL")" SCRIPT="scripts/slurmtest.sh" REMOTE_SCRIPT="$DIR/$(basename "$SCRIPT")" -run_cmd mkdir "$DIR" -run_cmd virtualenv -p /usr/bin/python3.8 "$VENV" +run_cmd mkdir -p "$DIR" +run_cmd virtualenv -p /home/ubuntu/miniconda3/bin/python "$VENV" run_scp "$WHEEL" "$REMOTE_WHEEL" run_scp "$SCRIPT" "$REMOTE_SCRIPT" run_cmd "$REMOTE_SCRIPT" "$REMOTE_WHEEL" "$VENV" diff --git a/scripts/slurmtest.sh b/scripts/slurmtest.sh index f2f556b3c..10f319f78 100755 --- a/scripts/slurmtest.sh +++ b/scripts/slurmtest.sh @@ -10,23 +10,43 @@ set -ex REMOTE_WHEEL="$1" VENV="$2" +BASE_DIR="$(dirname "$REMOTE_WHEEL")" +DIR="$BASE_DIR/project" +mkdir "$DIR" +cd "$DIR" + # shellcheck disable=SC1091 source /opt/slurm/etc/slurm.sh sbatch --version # shellcheck disable=SC1090 source "$VENV"/bin/activate python --version + pip install "$REMOTE_WHEEL" +pip install numpy +pip install torch==1.10.2+cpu -f https://download.pytorch.org/whl/cpu/torch_stable.html + +cat < .torchxconfig +[slurm] +partition=compute +time=10 +comment=hello +nomem=true +EOT + +cat < main.py +print("hello world!") +EOT -APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10,comment=hello utils.echo --num_replicas 3)" +APP_ID="$(torchx run --wait --log --scheduler slurm dist.ddp -j 2x1 --script main.py)" torchx status "$APP_ID" torchx describe "$APP_ID" sacct -j "$(basename "$APP_ID")" torchx log "$APP_ID" -LINES="$(torchx log "$APP_ID" | wc -l)" +LINES="$(torchx log "$APP_ID" | grep -c 'hello world')" -if [ "$LINES" -ne 3 ] +if [ "$LINES" -ne 2 ] then - echo "expected 3 log lines" + echo "expected 2 log lines" exit 1 fi diff --git a/torchx/components/dist.py b/torchx/components/dist.py index b8c2654f7..3bedb0781 100644 --- a/torchx/components/dist.py +++ b/torchx/components/dist.py @@ -121,8 +121,9 @@ Components APIs ----------------- """ +import shlex from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, Iterable import torchx import torchx.specs as specs @@ -131,16 +132,19 @@ def ddp( *script_args: str, - script: str, + script: Optional[str] = None, + m: Optional[str] = None, image: str = torchx.IMAGE, name: Optional[str] = None, + h: Optional[str] = None, cpu: int = 2, gpu: int = 0, memMB: int = 1024, - h: Optional[str] = None, j: str = "1x2", env: Optional[Dict[str, str]] = None, - rdzv_endpoint: str = "etcd-server.default.svc.cluster.local:2379", + max_restarts: Optional[int] = None, + rdzv_backend: str = "c10d", + rdzv_endpoint: Optional[str] = None, ) -> specs.AppDef: """ Distributed data parallel style application (one role, multi-replica). @@ -154,6 +158,7 @@ def ddp( Args: script_args: arguments to the main module script: script or binary to run within the image + m: the python module path to run image: image (e.g. docker) name: job name override (uses the script name if not specified) cpu: number of cpus per replica @@ -162,9 +167,14 @@ def ddp( h: a registered named resource (if specified takes precedence over cpu, gpu, memMB) j: {nnodes}x{nproc_per_node}, for gpu hosts, nproc_per_node must not exceed num gpus env: environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3) - rdzv_endpoint: etcd server endpoint (only matters when nnodes > 1) + max_restarts: the number of restarts allowed + rdzv_backend: rendezvous backend (only matters when nnodes > 1) + rdzv_endpoint: rendezvous server endpoint (only matters when nnodes > 1), defaults to rank0 host for schedulers that support it """ + if (script is None) == (m is None): + raise ValueError("exactly one of --script and -m must be specified") + rep = j.split("x") if len(rep) == 1: # num replicas only nnodes = 1 @@ -175,33 +185,79 @@ def ddp( else: raise ValueError(f"Invalid format for -j, usage example: 1x4. Given: {j}") - script_name_noext = Path(script).stem # script name no extension + if script: + # script name/module no extension + role_name = Path(script).stem + elif m: + role_name = m.rpartition(".")[2] + else: + raise ValueError("failed to compute role_name") + + if rdzv_endpoint is None: + rdzv_endpoint = _noquote(f"$${macros.rank0_env}:29500") + + if nnodes == 1: + rdzv_backend = "c10d" + rdzv_endpoint = "localhost:29500" + + if env is None: + env = {} + env.setdefault("LOGLEVEL", "INFO") + + cmd = [ + "python", + "-m", + "torch.distributed.run", + "--rdzv_backend", + rdzv_backend, + "--rdzv_endpoint", + rdzv_endpoint, + "--rdzv_id", + f"{macros.app_id}", + "--nnodes", + str(nnodes), + "--nproc_per_node", + str(nproc_per_node), + ] + if max_restarts is not None: + cmd += ["--max_restarts", str(max_restarts)] + if script is not None: + cmd += [script] + elif m is not None: + cmd += ["-m", m] + cmd += script_args return specs.AppDef( - name=name or script_name_noext, + name=name or role_name, roles=[ specs.Role( - name=script_name_noext, + name=role_name, image=image, - entrypoint="python", + entrypoint="bash", num_replicas=nnodes, resource=specs.resource(cpu=cpu, gpu=gpu, memMB=memMB, h=h), - args=[ - "-m", - "torch.distributed.run", - "--rdzv_backend", - ("c10d" if nnodes == 1 else "etcd"), - "--rdzv_endpoint", - ("localhost:29500" if nnodes == 1 else rdzv_endpoint), - "--rdzv_id", - f"{macros.app_id}", - "--nnodes", - str(nnodes), - "--nproc_per_node", - str(nproc_per_node), - script, - *script_args, - ], - env=env or {}, + args=["-c", _args_join(cmd)], + env=env, + port_map={ + "c10d": 29500, + }, ) ], ) + + +def _args_join(args: Iterable[str]) -> str: + """ + _args_join is like shlex.join but if the argument is wrapped in _noquote + it'll not quote that argument. + """ + quoted = [arg if isinstance(arg, _noquote) else shlex.quote(arg) for arg in args] + return " ".join(quoted) + + +class _noquote(str): + """ + _noquote is a wrapper around str that indicates that the argument shouldn't + be passed through shlex.quote. + """ + + pass diff --git a/torchx/components/integration_tests/component_provider.py b/torchx/components/integration_tests/component_provider.py index 7145818fa..98bf4c0f2 100644 --- a/torchx/components/integration_tests/component_provider.py +++ b/torchx/components/integration_tests/component_provider.py @@ -36,12 +36,12 @@ def tearDown(self) -> None: class DDPComponentProvider(ComponentProvider): def get_app_def(self) -> AppDef: - rdzv_endpoint: str = "localhost:29400" return dist_components.ddp( script="torchx/components/integration_tests/test/dummy_app.py", name="ddp-trainer", image=self._image, - rdzv_endpoint=rdzv_endpoint, + j="2x2", + max_restarts=3, ) diff --git a/torchx/schedulers/api.py b/torchx/schedulers/api.py index e2521df71..126e59fbc 100644 --- a/torchx/schedulers/api.py +++ b/torchx/schedulers/api.py @@ -24,6 +24,7 @@ SchedulerBackend, runopts, ) +from torchx.workspace.api import Workspace class Stream(str, Enum): @@ -90,13 +91,25 @@ def close(self) -> None: """ pass - def submit(self, app: AppDef, cfg: Mapping[str, CfgVal]) -> str: + def submit( + self, + app: AppDef, + cfg: Mapping[str, CfgVal], + workspace: Optional[str] = None, + ) -> str: """ Submits the application to be run by the scheduler. + WARNING: Mostly used for tests. Users should prefer to use the TorchX runner instead. + Returns: The application id that uniquely identifies the submitted app. """ + if workspace: + sched = self + assert isinstance(sched, Workspace) + role = app.roles[0] + sched.build_workspace_and_update_role(role, workspace) dryrun_info = self.submit_dryrun(app, cfg) return self.schedule(dryrun_info) diff --git a/torchx/schedulers/aws_batch_scheduler.py b/torchx/schedulers/aws_batch_scheduler.py index 94487b5d0..0dc2c3062 100644 --- a/torchx/schedulers/aws_batch_scheduler.py +++ b/torchx/schedulers/aws_batch_scheduler.py @@ -223,16 +223,28 @@ def _submit_dryrun( for role_idx, role in enumerate(app.roles): for replica_id in range(role.num_replicas): + rank = len(nodes) values = macros.Values( img_root="", app_id=name, replica_id=str(replica_id), + rank0_env=( + "TORCHX_RANK0_HOST" + if rank == 0 + else "AWS_BATCH_JOB_MAIN_NODE_PRIVATE_IPV4_ADDRESS" + ), ) replica_role = values.apply(role) replica_role.env["TORCHX_ROLE_IDX"] = str(role_idx) replica_role.env["TORCHX_ROLE_NAME"] = str(role.name) replica_role.env["TORCHX_REPLICA_IDX"] = str(replica_id) - nodes.append(role_to_node_properties(len(nodes), replica_role)) + if rank == 0: + # AWS_BATCH_JOB_MAIN_NODE_PRIVATE_IPV4_ADDRESS is only + # available on the child workers so we set the address to + # localhost for rank0. + # See: https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html + replica_role.env["TORCHX_RANK0_HOST"] = "localhost" + nodes.append(role_to_node_properties(rank, replica_role)) req = BatchJob( name=name, diff --git a/torchx/schedulers/docker_scheduler.py b/torchx/schedulers/docker_scheduler.py index 5ec9597bc..28ade8363 100644 --- a/torchx/schedulers/docker_scheduler.py +++ b/torchx/schedulers/docker_scheduler.py @@ -185,12 +185,14 @@ def _submit_dryrun( app_id = make_unique(app.name) req = DockerJob(app_id=app_id, containers=[]) + rank0_name = f"{app_id}-{app.roles[0].name}-0" for role in app.roles: for replica_id in range(role.num_replicas): values = macros.Values( img_root="", app_id=app_id, replica_id=str(replica_id), + rank0_env="TORCHX_RANK0_HOST", ) replica_role = values.apply(role) name = f"{app_id}-{role.name}-{replica_id}" @@ -199,6 +201,9 @@ def _submit_dryrun( if replica_role.env: env.update(replica_role.env) + # configure distributed host envs + env["TORCHX_RANK0_HOST"] = rank0_name + c = DockerContainer( image=replica_role.image, command=[replica_role.entrypoint] + replica_role.args, diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 07ee4bb23..af1b1534e 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -254,9 +254,14 @@ def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]: img_root="", app_id=unique_app_id, replica_id=str(replica_id), + rank0_env=f"VC_{cleanup_str(app.roles[0].name)}_0_HOSTS".upper(), ) + if role_idx == 0 and replica_id == 0: + values.rank0_env = "TORCHX_RANK0_HOST" name = cleanup_str(f"{role.name}-{replica_id}") replica_role = values.apply(role) + if role_idx == 0 and replica_id == 0: + replica_role.env["TORCHX_RANK0_HOST"] = "localhost" pod = role_to_pod(name, replica_role) pod.metadata.labels.update(pod_labels(app, role_idx, role, replica_id)) diff --git a/torchx/schedulers/local_scheduler.py b/torchx/schedulers/local_scheduler.py index 68a788860..5efd0537d 100644 --- a/torchx/schedulers/local_scheduler.py +++ b/torchx/schedulers/local_scheduler.py @@ -864,8 +864,10 @@ def _to_popen_request( img_root=img_root, app_id=app_id, replica_id=str(replica_id), + rank0_env="TORCHX_RANK0_HOST", ) replica_role = values.apply(role) + replica_role.env["TORCHX_RANK0_HOST"] = "localhost" replica_log_dir = os.path.join(app_log_dir, role.name, str(replica_id)) if "TORCHELASTIC_ERROR_FILE" not in replica_role.env: diff --git a/torchx/schedulers/ray_scheduler.py b/torchx/schedulers/ray_scheduler.py index e5faf3184..0ffb22502 100644 --- a/torchx/schedulers/ray_scheduler.py +++ b/torchx/schedulers/ray_scheduler.py @@ -45,6 +45,12 @@ except ImportError: _has_ray = False + +def has_ray() -> bool: + """Indicates whether Ray is installed in the current Python environment.""" + return _has_ray + + if _has_ray: _logger: logging.Logger = logging.getLogger(__name__) @@ -70,10 +76,6 @@ def serialize( with open(os.path.join(dirpath, output_filename), "w") as tmp: json.dump(actors_json, tmp) - def has_ray() -> bool: - """Indicates whether Ray is installed in the current Python environment.""" - return _has_ray - @dataclass class RayJob: """Represents a job that should be run on a Ray cluster. @@ -230,7 +232,10 @@ def _submit_dryrun( # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders # in arguments and environment variables. role = macros.Values( - img_root=role.image, app_id=app_id, replica_id="${rank}" + img_root=role.image, + app_id=app_id, + replica_id="${rank}", + rank0_env="MASTER_ADDR", ).apply(role) actor = RayActor( diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 19ca645b2..cd41f0c84 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -323,6 +323,7 @@ def _submit_dryrun( img_root=role.image, app_id=macros.app_id, replica_id=str(replica_id), + rank0_env="SLURM_JOB_NODELIST_HET_GROUP_0", ) name = f"{role.name}-{replica_id}" replica_role = values.apply(role) diff --git a/torchx/schedulers/test/api_test.py b/torchx/schedulers/test/api_test.py index 0f8057c2e..d7343c0b4 100644 --- a/torchx/schedulers/test/api_test.py +++ b/torchx/schedulers/test/api_test.py @@ -20,11 +20,13 @@ InvalidRunConfigException, Resource, runopts, + Role, ) +from torchx.workspace.api import Workspace class SchedulerTest(unittest.TestCase): - class MockScheduler(Scheduler): + class MockScheduler(Scheduler, Workspace): def __init__(self, session_name: str) -> None: super().__init__("mock", session_name) @@ -67,6 +69,9 @@ def run_opts(self) -> runopts: def resolve_resource(self, resource: Union[str, Resource]) -> Resource: return NULL_RESOURCE + def build_workspace_and_update_role(self, role: Role, workspace: str) -> None: + role.image = workspace + def test_invalid_run_cfg(self) -> None: scheduler_mock = SchedulerTest.MockScheduler("test_session") app_mock = MagicMock() @@ -79,6 +84,20 @@ def test_invalid_run_cfg(self) -> None: bad_type_cfg = {"foo": 100} scheduler_mock.submit(app_mock, bad_type_cfg) + def test_submit_workspace(self) -> None: + role = Role( + name="sleep", + image="", + entrypoint="foo.sh", + ) + app = AppDef(name="test_app", roles=[role]) + + scheduler_mock = SchedulerTest.MockScheduler("test_session") + + bad_type_cfg = {"foo": "asdf"} + scheduler_mock.submit(app, bad_type_cfg, workspace="some_workspace") + self.assertEqual(app.roles[0].image, "some_workspace") + def test_invalid_dryrun_cfg(self) -> None: scheduler_mock = SchedulerTest.MockScheduler("test_session") app_mock = MagicMock() diff --git a/torchx/schedulers/test/aws_batch_scheduler_test.py b/torchx/schedulers/test/aws_batch_scheduler_test.py index 2075c404d..a9b9518cd 100644 --- a/torchx/schedulers/test/aws_batch_scheduler_test.py +++ b/torchx/schedulers/test/aws_batch_scheduler_test.py @@ -22,7 +22,14 @@ def _test_app() -> specs.AppDef: name="trainer", image="pytorch/torchx:latest", entrypoint="main", - args=["--output-path", specs.macros.img_root, "--app-id", specs.macros.app_id], + args=[ + "--output-path", + specs.macros.img_root, + "--app-id", + specs.macros.app_id, + "--rank0_env", + specs.macros.rank0_env, + ], env={"FOO": "bar"}, resource=specs.Resource( cpu=2, @@ -30,7 +37,7 @@ def _test_app() -> specs.AppDef: gpu=4, ), port_map={"foo": 1234}, - num_replicas=1, + num_replicas=2, max_retries=3, ) @@ -73,7 +80,7 @@ def test_submit_dryrun(self) -> None: "jobDefinitionName": "app-name-42", "type": "multinode", "nodeProperties": { - "numNodes": 1, + "numNodes": 2, "mainNode": 0, "nodeRangeProperties": [ { @@ -85,6 +92,8 @@ def test_submit_dryrun(self) -> None: "", "--app-id", "app-name-42", + "--rank0_env", + "TORCHX_RANK0_HOST", ], "image": "pytorch/torchx:latest", "environment": [ @@ -92,6 +101,7 @@ def test_submit_dryrun(self) -> None: {"name": "TORCHX_ROLE_IDX", "value": "0"}, {"name": "TORCHX_ROLE_NAME", "value": "trainer"}, {"name": "TORCHX_REPLICA_IDX", "value": "0"}, + {"name": "TORCHX_RANK0_HOST", "value": "localhost"}, ], "resourceRequirements": [ {"type": "VCPU", "value": "2"}, @@ -100,7 +110,34 @@ def test_submit_dryrun(self) -> None: ], "logConfiguration": {"logDriver": "awslogs"}, }, - } + }, + { + "targetNodes": "1", + "container": { + "command": [ + "main", + "--output-path", + "", + "--app-id", + "app-name-42", + "--rank0_env", + "AWS_BATCH_JOB_MAIN_NODE_PRIVATE_IPV4_ADDRESS", + ], + "image": "pytorch/torchx:latest", + "environment": [ + {"name": "FOO", "value": "bar"}, + {"name": "TORCHX_ROLE_IDX", "value": "0"}, + {"name": "TORCHX_ROLE_NAME", "value": "trainer"}, + {"name": "TORCHX_REPLICA_IDX", "value": "1"}, + ], + "resourceRequirements": [ + {"type": "VCPU", "value": "2"}, + {"type": "MEMORY", "value": "3000"}, + {"type": "GPU", "value": "4"}, + ], + "logConfiguration": {"logDriver": "awslogs"}, + }, + }, ], }, "retryStrategy": { @@ -180,6 +217,10 @@ def _mock_scheduler(self) -> AWSBatchScheduler: {"name": "TORCHX_ROLE_IDX", "value": "0"}, {"name": "TORCHX_REPLICA_IDX", "value": "0"}, {"name": "TORCHX_ROLE_NAME", "value": "echo"}, + { + "name": "TORCHX_RANK0_HOST", + "value": "localhost", + }, ], "mountPoints": [], "ulimits": [], @@ -274,6 +315,7 @@ def test_describe(self) -> None: "TORCHX_ROLE_IDX": "0", "TORCHX_REPLICA_IDX": "0", "TORCHX_ROLE_NAME": "echo", + "TORCHX_RANK0_HOST": "localhost", }, ), ) diff --git a/torchx/schedulers/test/docker_scheduler_test.py b/torchx/schedulers/test/docker_scheduler_test.py index 7f2fd0c1f..c94c2e151 100644 --- a/torchx/schedulers/test/docker_scheduler_test.py +++ b/torchx/schedulers/test/docker_scheduler_test.py @@ -5,12 +5,15 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import posixpath import unittest from datetime import datetime, timedelta from unittest.mock import patch +import fsspec from docker.types import DeviceRequest from torchx import specs +from torchx.components.dist import ddp from torchx.schedulers.api import Stream from torchx.schedulers.docker_scheduler import ( DockerContainer, @@ -28,7 +31,14 @@ def _test_app() -> specs.AppDef: name="trainer", image="pytorch/torchx:latest", entrypoint="main", - args=["--output-path", specs.macros.img_root, "--app-id", specs.macros.app_id], + args=[ + "--output-path", + specs.macros.img_root, + "--app-id", + specs.macros.app_id, + "--rank0-env", + specs.macros.rank0_env, + ], env={"FOO": "bar"}, resource=specs.Resource( cpu=2, @@ -66,6 +76,8 @@ def test_submit_dryrun(self) -> None: "", "--app-id", "app_name_42", + "--rank0-env", + "TORCHX_RANK0_HOST", ], kwargs={ "device_requests": [ @@ -76,6 +88,7 @@ def test_submit_dryrun(self) -> None: ], "environment": { "FOO": "bar", + "TORCHX_RANK0_HOST": "app_name_42-trainer-0", }, "labels": { "torchx.pytorch.org/app-id": "app_name_42", @@ -102,13 +115,16 @@ def test_submit_dryrun(self) -> None: def test_copy_env(self) -> None: app = _test_app() cfg = {"copy_env": ["FOO_*", "BAR_*"]} - info = self.scheduler._submit_dryrun(app, cfg) + with patch("torchx.schedulers.docker_scheduler.make_unique") as make_unique_ctx: + make_unique_ctx.return_value = "app_name_42" + info = self.scheduler._submit_dryrun(app, cfg) self.assertEqual( info.request.containers[0].kwargs["environment"], { "FOO": "bar", "FOO_1": "f1", "BAR_1": "b1", + "TORCHX_RANK0_HOST": "app_name_42-trainer-0", }, ) @@ -317,22 +333,11 @@ def test_docker_submit_error_retries(self) -> None: self.assertEqual(AppState.FAILED, desc.state) def test_docker_submit_dist(self) -> None: - app = AppDef( - name="test-app", - roles=[ - Role( - name="ping", - image="busybox", - entrypoint="sh", - args=[ - "-c", - f"sleep 1; ping -c1 {specs.macros.app_id}-ping-0; sleep 1", - ], - num_replicas=2, - ), - ], - ) - app_id = self.scheduler.submit(app, cfg={}) + workspace = "memory://docker_submit_dist/" + with fsspec.open(posixpath.join(workspace, "main.py"), "wt") as f: + f.write("print('hello world')\n") + app = ddp(script="main.py", j="2x1") + app_id = self.scheduler.submit(app, cfg={}, workspace=workspace) print(app_id) desc = self.wait(app_id) diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index 748749da6..8d316b3e8 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -29,12 +29,19 @@ SKIP_DOCKER: bool = not has_docker() -def _test_app() -> specs.AppDef: +def _test_app(num_replicas: int = 1) -> specs.AppDef: trainer_role = specs.Role( - name="trainer", + name="trainer_foo", image="pytorch/torchx:latest", entrypoint="main", - args=["--output-path", specs.macros.img_root, "--app-id", specs.macros.app_id], + args=[ + "--output-path", + specs.macros.img_root, + "--app-id", + specs.macros.app_id, + "--rank0-env", + specs.macros.rank0_env, + ], env={"FOO": "bar"}, resource=specs.Resource( cpu=2, @@ -42,7 +49,7 @@ def _test_app() -> specs.AppDef: gpu=4, ), port_map={"foo": 1234}, - num_replicas=1, + num_replicas=num_replicas, max_retries=3, ) @@ -68,7 +75,15 @@ def test_app_to_resource_resolved_macros(self) -> None: .spec.containers[0] .command ) - expected_cmd = ["main", "--output-path", "", "--app-id", unique_app_name] + expected_cmd = [ + "main", + "--output-path", + "", + "--app-id", + unique_app_name, + "--rank0-env", + "TORCHX_RANK0_HOST", + ] self.assertEqual(expected_cmd, actual_cmd) def test_retry_policy_not_set(self) -> None: @@ -118,6 +133,8 @@ def test_role_to_pod(self) -> None: specs.macros.img_root, "--app-id", specs.macros.app_id, + "--rank0-env", + specs.macros.rank0_env, ], image="pytorch/torchx:latest", name="name", @@ -181,7 +198,7 @@ def test_submit_dryrun(self) -> None: schedulerName: volcano tasks: - maxRetry: 3 - name: trainer-0 + name: trainerfoo-0 policies: - action: RestartJob event: PodEvicted @@ -196,7 +213,7 @@ def test_submit_dryrun(self) -> None: torchx.pytorch.org/app-name: test torchx.pytorch.org/replica-id: '0' torchx.pytorch.org/role-index: '0' - torchx.pytorch.org/role-name: trainer + torchx.pytorch.org/role-name: trainer_foo torchx.pytorch.org/version: {torchx.__version__} spec: containers: @@ -206,11 +223,15 @@ def test_submit_dryrun(self) -> None: - '' - --app-id - app-name-42 + - --rank0-env + - TORCHX_RANK0_HOST env: - name: FOO value: bar + - name: TORCHX_RANK0_HOST + value: localhost image: pytorch/torchx:latest - name: trainer-0 + name: trainerfoo-0 ports: - containerPort: 1234 name: foo @@ -227,6 +248,30 @@ def test_submit_dryrun(self) -> None: """, ) + def test_rank0_env(self) -> None: + from kubernetes.client.models import ( + V1EnvVar, + ) + + scheduler = create_scheduler("test") + app = _test_app(num_replicas=2) + cfg = {"queue": "testqueue"} + with patch( + "torchx.schedulers.kubernetes_scheduler.make_unique" + ) as make_unique_ctx: + make_unique_ctx.return_value = "app-name-42" + info = scheduler._submit_dryrun(app, cfg) + + # pyre-fixme[16]; `object` has no attribute `__getitem__`. + tasks = info.request.resource["spec"]["tasks"] + container0 = tasks[0]["template"].spec.containers[0] + self.assertIn("TORCHX_RANK0_HOST", container0.command) + self.assertIn( + V1EnvVar(name="TORCHX_RANK0_HOST", value="localhost"), container0.env + ) + container1 = tasks[1]["template"].spec.containers[0] + self.assertIn("VC_TRAINERFOO_0_HOSTS", container1.command) + def test_submit_dryrun_patch(self) -> None: scheduler = create_scheduler("test") app = _test_app() diff --git a/torchx/schedulers/test/local_scheduler_test.py b/torchx/schedulers/test/local_scheduler_test.py index 2ae74a761..63cbec443 100644 --- a/torchx/schedulers/test/local_scheduler_test.py +++ b/torchx/schedulers/test/local_scheduler_test.py @@ -523,6 +523,7 @@ def test_submit_dryrun_without_log_dir_cfg(self, _) -> None: self.assertEqual([role.entrypoint, *role.args], replica_param.args) self.assertEqual( { + "TORCHX_RANK0_HOST": "localhost", ERR_FILE_ENV: join(replica_log_dir, "error.json"), **role.env, }, @@ -571,6 +572,7 @@ def test_submit_dryrun_with_log_dir_cfg( self.assertEqual([role.entrypoint, *role.args], replica_param.args) self.assertEqual( { + "TORCHX_RANK0_HOST": "localhost", ERR_FILE_ENV: join(replica_log_dir, "error.json"), **role.env, }, diff --git a/torchx/schedulers/test/ray_scheduler_test.py b/torchx/schedulers/test/ray_scheduler_test.py index abec79a1e..917c028f2 100644 --- a/torchx/schedulers/test/ray_scheduler_test.py +++ b/torchx/schedulers/test/ray_scheduler_test.py @@ -11,22 +11,24 @@ from unittest import TestCase from unittest.mock import patch -import ray from torchx.schedulers import get_schedulers from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse -from torchx.schedulers.ray import ray_driver from torchx.schedulers.ray.ray_common import RayActor from torchx.schedulers.ray_scheduler import ( - RayScheduler, - _logger, has_ray, - RayJob, - serialize, ) from torchx.specs import AppDef, CfgVal, Resource, Role, runopts if has_ray(): + import ray + from torchx.schedulers.ray import ray_driver + from torchx.schedulers.ray_scheduler import ( + RayScheduler, + _logger, + RayJob, + serialize, + ) class RaySchedulerRegistryTest(TestCase): def test_get_schedulers_returns_ray_scheduler(self) -> None: diff --git a/torchx/specs/api.py b/torchx/specs/api.py index 1848fe1a8..1f7e20ab0 100644 --- a/torchx/specs/api.py +++ b/torchx/specs/api.py @@ -137,11 +137,19 @@ class macros: app_id = "${app_id}" replica_id = "${replica_id}" + # rank0_env will be filled with the name of the environment variable that + # provides the master host address. To get the actual hostname the + # environment variable must be resolved by the app via either shell + # expansion (wrap sh/bash) or via the application. + # This may not be available on all schedulers. + rank0_env = "${rank0_env}" + @dataclass class Values: img_root: str app_id: str replica_id: str + rank0_env: str base_img_root: str = "DEPRECATED" def apply(self, role: "Role") -> "Role": diff --git a/torchx/specs/test/api_test.py b/torchx/specs/test/api_test.py index af9409171..c9660e5cf 100644 --- a/torchx/specs/test/api_test.py +++ b/torchx/specs/test/api_test.py @@ -383,6 +383,7 @@ def test_substitute(self) -> None: app_id="app_id", replica_id="replica_id", base_img_root="base_img_root", + rank0_env="rank0_env", ) for key, val in asdict(v).items(): template = f"tmpl-{getattr(macros, key)}" @@ -401,6 +402,7 @@ def test_apply(self) -> None: app_id="app_id", replica_id="replica_id", base_img_root="base_img_root", + rank0_env="rank0_env", ) newrole = v.apply(role) self.assertNotEqual(newrole, role) diff --git a/torchx/test/version_test.py b/torchx/test/version_test.py index dc5e56e51..1357c9b4a 100644 --- a/torchx/test/version_test.py +++ b/torchx/test/version_test.py @@ -16,9 +16,6 @@ def test_can_get_version(self) -> None: self.assertIsNotNone(torchx.IMAGE) def test_images(self) -> None: - from torchx.version import __version__, TORCHX_IMAGE, EXAMPLES_IMAGE + from torchx.version import __version__, TORCHX_IMAGE self.assertEqual(TORCHX_IMAGE, f"ghcr.io/pytorch/torchx:{__version__}") - self.assertEqual( - EXAMPLES_IMAGE, f"ghcr.io/pytorch/torchx-examples:{__version__}" - ) diff --git a/torchx/version.py b/torchx/version.py index e7bd6a6ef..6a7996d27 100644 --- a/torchx/version.py +++ b/torchx/version.py @@ -19,4 +19,3 @@ # Use the github container registry images corresponding to the current package # version. TORCHX_IMAGE = f"ghcr.io/pytorch/torchx:{__version__}" -EXAMPLES_IMAGE = f"ghcr.io/pytorch/torchx-examples:{__version__}"