diff --git a/dev-requirements.txt b/dev-requirements.txt index 3072df1de..344dd5c95 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,8 +10,8 @@ kfp==1.8.9 moto==2.2.12 pyre-extensions==0.0.21 pytorch-lightning==1.5.6 -ray==1.9.0 s3fs==2021.10.1 +ray[default]==1.9.2 torch-model-archiver==0.4.2 torch==1.10.0 torchserve==0.4.2 diff --git a/docs/source/index.rst b/docs/source/index.rst index 6a9d5b4c8..93747761f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -96,6 +96,7 @@ Works With schedulers/local schedulers/kubernetes schedulers/slurm + schedulers/ray .. _Pipelines: .. toctree:: diff --git a/docs/source/schedulers/ray.rst b/docs/source/schedulers/ray.rst new file mode 100644 index 000000000..9095f8541 --- /dev/null +++ b/docs/source/schedulers/ray.rst @@ -0,0 +1,8 @@ +Ray +================= + +.. automodule:: torchx.schedulers.ray_scheduler +.. currentmodule:: torchx.schedulers.ray_scheduler + +.. autoclass:: RayScheduler + :members: diff --git a/scripts/component_integration_tests.py b/scripts/component_integration_tests.py index 8d4563bcd..597b3b930 100755 --- a/scripts/component_integration_tests.py +++ b/scripts/component_integration_tests.py @@ -37,6 +37,13 @@ def get_k8s_sched_info(image: str) -> SchedulerInfo: return SchedulerInfo(name="kubernetes", image=image, cfg=cfg) +def get_ray_sched_info(image: str) -> SchedulerInfo: + cfg = { + "namespace": "torchx-dev", + } + return SchedulerInfo(name="ray", image=image, cfg=cfg) + + def get_local_cwd_sched_info(image: str) -> SchedulerInfo: return SchedulerInfo(name="local_cwd", image=image) diff --git a/torchx/cli/cmd_log.py b/torchx/cli/cmd_log.py index c2ec9d482..5b68388a1 100644 --- a/torchx/cli/cmd_log.py +++ b/torchx/cli/cmd_log.py @@ -28,7 +28,7 @@ def validate(job_identifier: str) -> None: - if not re.match(r"^\w+://[^/.]*/[^/.]+(/[^/.]+(/(\d+,?)+)?)?$", job_identifier): + if not re.match(r"^\w+://[^/]*/[^/]+(/[^/]+(/(\d+,?)+)?)?$", job_identifier): logger.error( f"{job_identifier} is not of the form {ID_FORMAT}", ) diff --git a/torchx/components/test/utils_test.py b/torchx/components/test/utils_test.py index b46555d24..b247a544a 100644 --- a/torchx/components/test/utils_test.py +++ b/torchx/components/test/utils_test.py @@ -15,6 +15,9 @@ def test_sh(self) -> None: def test_python(self) -> None: self.validate(utils, "python") + def test_binary(self) -> None: + self.validate(utils, "binary") + def test_touch(self) -> None: self.validate(utils, "touch") diff --git a/torchx/components/utils.py b/torchx/components/utils.py index 5e3f139a2..7fbaed67a 100644 --- a/torchx/components/utils.py +++ b/torchx/components/utils.py @@ -147,6 +147,36 @@ def python( ) +def binary( + *args: str, + entrypoint: str, + name: str = "torchx_utils_python", + num_replicas: int = 1, +) -> specs.AppDef: + """ + Test component + + Args: + args: arguments passed to the program in sys.argv[1:] (ignored with `--c`) + name: name of the job + num_replicas: number of copies to run (each on its own container) + :return: + """ + return specs.AppDef( + name=name, + roles=[ + specs.Role( + name="binary", + image="", + entrypoint=entrypoint, + num_replicas=num_replicas, + resource=specs.Resource(cpu=2, gpu=0, memMB=4096), + args=[*args], + ) + ], + ) + + def copy(src: str, dst: str, image: str = torchx.IMAGE) -> specs.AppDef: """ copy copies the file from src to dst. src and dst can be any valid fsspec diff --git a/torchx/examples/apps/aws/ray/ray_cluster.yaml b/torchx/examples/apps/aws/ray/ray_cluster.yaml new file mode 100644 index 000000000..160d9076d --- /dev/null +++ b/torchx/examples/apps/aws/ray/ray_cluster.yaml @@ -0,0 +1,142 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: gpu-docker + +min_workers: 1 +max_workers: 4 + +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "rayproject/ray-ml:latest-gpu" + # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull + container_name: "ray_nvidia_docker" # e.g. ray_docker + + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + # Availability zone(s), comma-separated, that nodes may be launched in. + # Nodes are currently spread between zones by a round-robin approach, + # however this implementation detail should not be relied upon. + availability_zone: us-west-2a,us-west-2b + security_group: + GroupName: dashboard_group + IpPermissions: + - FromPort: 20002 + ToPort: 20002 + IpProtocol: TCP + IpRanges: + - CidrIp: 0.0.0.0/0 + + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Tell the autoscaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + # GPU head node. + ray.head.gpu: + # worker_image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: p2.xlarge + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # You can provision additional disk space with a conf as follows + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 100 + # Additional options in the boto docs. + # CPU workers. + ray.worker.default: + # Override global docker setting. + # This node type will run a CPU image, + # rather than the GPU image specified in the global docker settings. + docker: + worker_image: "rayproject/ray-ml:latest-cpu" + # The minimum number of nodes of this type to launch. + # This number should be >= 0. + min_workers: 1 + # The maximum number of workers nodes of this type to launch. + # This takes precedence over min_workers. + max_workers: 2 + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: m5.large + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # Run workers on spot by default. Comment this out to use on-demand. + InstanceMarketOptions: + MarketType: spot + # Additional options can be found in the boto docs, e.g. + # SpotOptions: + # MaxPrice: MAX_HOURLY_PRICE + # Additional options in the boto docs. + +# Specify the node type of the head node (as configured above). +head_node_type: ray.head.gpu + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { + # "/path1/on/remote/machine": "/path1/on/local/machine", + # "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of shell commands to run to set up nodes. +# NOTE: rayproject/ray:latest has ray latest bundled +setup_commands: [] +# - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl +# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --dashboard-port 20002 --dashboard-host=0.0.0.0 --include-dashboard True --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/torchx/schedulers/__init__.py b/torchx/schedulers/__init__.py index 20e7269e8..83c48fada 100644 --- a/torchx/schedulers/__init__.py +++ b/torchx/schedulers/__init__.py @@ -5,7 +5,7 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -from typing import Dict +from typing import Dict, Optional import torchx.schedulers.docker_scheduler as docker_scheduler import torchx.schedulers.kubernetes_scheduler as kubernetes_scheduler @@ -22,6 +22,19 @@ def __call__(self, session_name: str, **kwargs: object) -> Scheduler: ... +def try_get_ray_scheduler() -> Optional[SchedulerFactory]: + try: + from torchx.schedulers.ray_scheduler import _has_ray # @manual + + if _has_ray: + import torchx.schedulers.ray_scheduler as ray_scheduler # @manual + + return ray_scheduler.create_scheduler + + except ImportError: # pragma: no cover + return None + + def get_scheduler_factories() -> Dict[str, SchedulerFactory]: """ get_scheduler_factories returns all the available schedulers names and the @@ -29,6 +42,7 @@ def get_scheduler_factories() -> Dict[str, SchedulerFactory]: The first scheduler in the dictionary is used as the default scheduler. """ + default_schedulers: Dict[str, SchedulerFactory] = { "local_docker": docker_scheduler.create_scheduler, "local_cwd": local_scheduler.create_cwd_scheduler, @@ -36,6 +50,10 @@ def get_scheduler_factories() -> Dict[str, SchedulerFactory]: "kubernetes": kubernetes_scheduler.create_scheduler, } + ray_scheduler_creator = try_get_ray_scheduler() + if ray_scheduler_creator: + default_schedulers["ray"] = ray_scheduler_creator + return load_group( "torchx.schedulers", default=default_schedulers, diff --git a/torchx/schedulers/ray/__init__.py b/torchx/schedulers/ray/__init__.py new file mode 100644 index 000000000..a9fdb3b99 --- /dev/null +++ b/torchx/schedulers/ray/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. diff --git a/torchx/schedulers/ray/ray_common.py b/torchx/schedulers/ray/ray_common.py new file mode 100644 index 000000000..a0b27ce4a --- /dev/null +++ b/torchx/schedulers/ray/ray_common.py @@ -0,0 +1,36 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +from dataclasses import dataclass, field +from typing import Dict + + +@dataclass +class RayActor: + """Describes an actor (a.k.a. role in TorchX terms). + + Attributes: + name: + The name of the actor. + command: + The command that the actor should run as a subprocess. + env: + The environment variables to set before executing the command. + num_replicas: + The number of replicas (i.e. Ray actors) to run. + num_cpus: + The number of CPUs to allocate. + num_gpus: + The number of GPUs to allocate. + """ + + name: str + command: str + env: Dict[str, str] = field(default_factory=dict) + num_replicas: int = 1 + num_cpus: int = 1 + num_gpus: int = 0 + # TODO: memory_size, max_retries, retry_policy diff --git a/torchx/schedulers/ray/ray_driver.py b/torchx/schedulers/ray/ray_driver.py new file mode 100644 index 000000000..1ff50ca9f --- /dev/null +++ b/torchx/schedulers/ray/ray_driver.py @@ -0,0 +1,174 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import contextlib +import importlib +import json +import logging +import os +import sys +from typing import Dict, List, Optional, Tuple + +import ray +from ray.train.utils import get_address_and_port +from ray.util.placement_group import PlacementGroup +from torchx.schedulers.ray.ray_common import RayActor + +_logger: logging.Logger = logging.getLogger(__name__) +_logger.setLevel(logging.INFO) + + +@contextlib.contextmanager +def redirect_argv(args: List[str]): # pyre-ignore[3] + _argv = sys.argv[:] + sys.argv = args + yield + sys.argv = _argv + + +@ray.remote +class CommandActor: # pragma: no cover + def __init__(self, command: str, env: Dict[str, str]) -> None: + self.args: List[str] = command.split(" ") + self.path: str = self.args[0] + + for k, v in env.items(): + os.environ[k] = v + + def exec_module(self) -> None: + spec: Optional[ + importlib.machinery.ModuleSpec + ] = importlib.util.spec_from_file_location("__main__", self.path) + if spec: # pragma: no cover + train = importlib.util.module_from_spec(spec) + with redirect_argv(self.args): + spec.loader.exec_module(train) # pyre-ignore[16] + + def get_actor_address_and_port(self) -> Tuple[str, int]: + return get_address_and_port() + + def set_address_and_port(self, address: str, port: int) -> None: + os.environ["MASTER_PORT"] = str(port) + os.environ["MASTER_ADDR"] = address + + +def load_actor_json(filename: str) -> List[RayActor]: + with open(filename) as f: + actors: List[RayActor] = [] + # Yes this is gross but it works + actor_dict = json.load(f) + actor_dict = json.loads(actor_dict) + for actor in actor_dict: + actors.append(RayActor(**actor)) + return actors + + +def create_placement_groups(actors: List[RayActor]) -> List[PlacementGroup]: + pgs: List[PlacementGroup] = [] + for actor in actors: + bundle = {"CPU": actor.num_cpus, "GPU": actor.num_gpus} + bundles = [bundle] * actor.num_replicas + + # To change the strategy type + # refer to available options here https://docs.ray.io/en/latest/placement-group.html#pgroup-strategy + pg = ray.util.placement_group(bundles, strategy="SPREAD") + pgs.append(pg) + + _logger.info("Waiting for placement group to start.") + ready = pg.wait(timeout_seconds=100) + + if ready: + _logger.info("Placement group has started.") + _logger.info("Starting remote function") + else: # pragma: no cover + raise TimeoutError( + "Placement group creation timed out. Make sure " + "your cluster either has enough resources or use " + "an autoscaling cluster. Current resources " + "available: {}, resources requested by the " + "placement group: {}".format(ray.available_resources(), pg.bundle_specs) + ) + return pgs + + +def create_command_actors( + actors: List[RayActor], pgs: List[PlacementGroup] +) -> List[CommandActor]: + + # 1. Create actors + # 2. For each actor get rank 0 address and port + # 3. Set address and port in command actor + command_actors: List[CommandActor] = [] + # address, port = get_address_and_port() + for i in range(len(actors)): + world_size = actors[i].num_replicas + actors_for_this_group = [] + + for rank in range(world_size): + + # Environment variables for distributed training + rank_env = { + "WORLD_SIZE": str(world_size), + "RANK": str(rank), + } + + actor_and_rank_env = {**actors[i].env, **rank_env} + + actors_for_this_group.append( + CommandActor.options( # pyre-ignore[16] + placement_group=pgs[i], + num_cpus=actors[i].num_cpus, + num_gpus=actors[i].num_gpus, + ).remote(actors[i].command, actor_and_rank_env) + ) + + rank_0_address, rank_0_port = ray.get( + actors_for_this_group[0].get_actor_address_and_port.remote() + ) + + for actor in actors_for_this_group: + ray.get(actor.set_address_and_port.remote(rank_0_address, rank_0_port)) + + command_actors.extend(actors_for_this_group) + + return command_actors + + +if __name__ == "__main__": # pragma: no cover + _logger.debug("Reading actor.json") + + actors: List[RayActor] = load_actor_json("actors.json") + os.remove("actors.json") + + _logger.debug("Creating Ray placement groups") + ray.init(address="auto", namespace="torchx-ray") + pgs: List[PlacementGroup] = create_placement_groups(actors) + + _logger.debug("Getting command actors") + command_actors: List[CommandActor] = create_command_actors(actors, pgs) + + _logger.debug("Running Ray actors") + active_workers = [ # pyre-ignore + command_actor.exec_module.remote() # pyre-ignore + for command_actor in command_actors + ] + + # Await return result of remote ray function + while len(active_workers) > 0: + completed_workers, active_workers = ray.wait(active_workers) + # If a failure occurs the ObjectRef will be marked as completed. + # Calling ray.get will expose the failure as a RayActorError. + for object_ref in completed_workers: + try: + ray.get(object_ref) + _logger.info("Ray remote function promise succesfully returned") + + # If an error occurs during the actor execution, this error will get propagated as-is to the driver when you call ray.get(). + # For example, if a ValueError is raised in the actor method call, this will be raised as a ValueError on the driver. + # These exceptions will not be caught in this try-except clause + except ray.exceptions.RayActorError as exc: + _logger.info("Ray Actor Error") + _logger.error("Ray Actor error", exc) diff --git a/torchx/schedulers/ray_scheduler.py b/torchx/schedulers/ray_scheduler.py index b23e5bdd5..4f0fb7a36 100644 --- a/torchx/schedulers/ray_scheduler.py +++ b/torchx/schedulers/ray_scheduler.py @@ -4,236 +4,329 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import dataclasses +import json import logging import os +import time from dataclasses import dataclass, field from datetime import datetime -from typing import Mapping, Any, Dict, Iterable, List, Optional, Set, Type - -from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream +from shutil import copy2, rmtree, copytree +from tempfile import mkdtemp +from typing import Any, Dict, List, Mapping, Optional, Set, Type + +from torchx.schedulers.api import ( + AppDryRunInfo, + AppState, + DescribeAppResponse, + Scheduler, + Stream, +) from torchx.schedulers.ids import make_unique -from torchx.specs import AppDef, CfgVal, SchedulerBackend, macros, runopts - +from torchx.schedulers.ray.ray_common import RayActor +from torchx.specs import ( + AppDef, + CfgVal, + SchedulerBackend, + macros, + runopts, + Role, + RoleStatus, + ReplicaStatus, +) try: - import ray # @manual # noqa: F401 + from ray.autoscaler import sdk as ray_autoscaler_sdk + from ray.dashboard.modules.job.common import JobStatus + from ray.dashboard.modules.job.sdk import JobSubmissionClient _has_ray = True + except ImportError: _has_ray = False +if _has_ray: + + _logger: logging.Logger = logging.getLogger(__name__) + + _ray_status_to_torchx_appstate: Dict[JobStatus, AppState] = { + JobStatus.PENDING: AppState.PENDING, + JobStatus.RUNNING: AppState.RUNNING, + JobStatus.SUCCEEDED: AppState.SUCCEEDED, + JobStatus.FAILED: AppState.FAILED, + JobStatus.STOPPED: AppState.CANCELLED, + } + + class _EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o: RayActor): # pyre-ignore[3] + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + return super().default(o) + + def serialize( + actors: List[RayActor], dirpath: str, output_filename: str = "actors.json" + ) -> None: + actors_json = json.dumps(actors, cls=_EnhancedJSONEncoder) + with open(os.path.join(dirpath, output_filename), "w") as tmp: + json.dump(actors_json, tmp) + + def has_ray() -> bool: + """Indicates whether Ray is installed in the current Python environment.""" + return _has_ray + + @dataclass + class RayJob: + """Represents a job that should be run on a Ray cluster. + + Attributes: + app_id: + The unique ID of the application (a.k.a. job). + cluster_config_file: + The Ray cluster configuration file. + cluster_name: + The cluster name to use. + dashboard_address: + The existing dashboard IP address to connect to + working_dir: + The working directory to copy to the cluster + requirements: + The libraries to install on the cluster per requirements.txt + actors: + The Ray actors which represent the job to be run. This attribute is + dumped to a JSON file and copied to the cluster where `ray_main.py` + uses it to initiate the job. + """ + + app_id: str + cluster_config_file: Optional[str] = None + cluster_name: Optional[str] = None + dashboard_address: Optional[str] = None + working_dir: Optional[str] = None + requirements: Optional[str] = None + scripts: Set[str] = field(default_factory=set) + actors: List[RayActor] = field(default_factory=list) + + class RayScheduler(Scheduler): + def __init__(self, session_name: str) -> None: + super().__init__("ray", session_name) + + # TODO: Add address as a potential CLI argument after writing ray.status() or passing in config file + def run_opts(self) -> runopts: + opts = runopts() + opts.add( + "cluster_config_file", + type_=str, + required=False, + help="Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.", + ) + opts.add( + "cluster_name", + type_=str, + help="Override the configured cluster name.", + ) + opts.add( + "dashboard_address", + type_=str, + required=False, + default="127.0.0.1:8265", + help="Use ray status to get the dashboard address you will submit jobs against", + ) + opts.add( + "working_dir", + type_=str, + help="Copy the the working directory containing the Python scripts to the cluster.", + ) + opts.add("requirements", type_=str, help="Path to requirements.txt") + return opts + + def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str: + cfg: RayJob = dryrun_info.request + + # Create serialized actors for ray_driver.py + actors = cfg.actors + dirpath = mkdtemp() + serialize(actors, dirpath) + + ip_address = "127.0.0.1:8265" + + if cfg.cluster_config_file: + ip_address = ray_autoscaler_sdk.get_head_node_ip( + cfg.cluster_config_file + ) # pragma: no cover + if cfg.dashboard_address: + ip_address = cfg.dashboard_address + + # 0. Create Job Client + client: JobSubmissionClient = JobSubmissionClient(f"http://{ip_address}") + + # 1. Copy working directory + if cfg.working_dir: + copytree(cfg.working_dir, dirpath, dirs_exist_ok=True) + + # 2. Copy Ray driver utilities + current_directory = os.path.dirname(os.path.abspath(__file__)) + copy2(os.path.join(current_directory, "ray", "ray_driver.py"), dirpath) + copy2(os.path.join(current_directory, "ray", "ray_common.py"), dirpath) + + # 3. Parse requirements.txt + reqs: List[str] = [] + if cfg.requirements: # pragma: no cover + with open(cfg.requirements) as f: + for line in f: + reqs.append(line.strip()) + + # 4. Submit Job via the Ray Job Submission API + try: + job_id: str = client.submit_job( + # we will pack, hash, zip, upload, register working_dir in GCS of ray cluster + # and use it to configure your job execution. + entrypoint="python ray_driver.py", + runtime_env={"working_dir": dirpath, "pip": reqs}, + # job_id = cfg.app_id + ) + + except Exception as e: + raise + + finally: + rmtree(dirpath) -_logger: logging.Logger = logging.getLogger(__name__) - - -def has_ray() -> bool: - """Indicates whether Ray is installed in the current Python environment.""" - return _has_ray - - -@dataclass -class RayActor: - """Describes an actor (a.k.a. role in TorchX terms). - - Attributes: - name: - The name of the actor. - command: - The command that the actor should run as a subprocess. - env: - The environment variables to set before executing the command. - num_replicas: - The number of replicas (i.e. Ray actors) to run. - num_cpus: - The number of CPUs to allocate. - num_gpus: - The number of GPUs to allocate. - """ - - name: str - command: str - env: Dict[str, str] = field(default_factory=dict) - num_replicas: int = 1 - num_cpus: int = 1 - num_gpus: int = 0 - # TODO: memory_size, max_retries, retry_policy - - -@dataclass -class RayJob: - """Represents a job that should be run on a Ray cluster. - - Attributes: - app_id: - The unique ID of the application (a.k.a. job). - cluster_config_file: - The Ray cluster configuration file. - cluster_name: - The cluster name to use. - copy_script: - A boolean value indicating whether to copy the script files to the - cluster. - copy_script_dir: - A boolean value indicating whether to copy the directories - containing the scripts to the cluster. - scripts: - The set of scripts to copy to the cluster. - actors: - The Ray actors which represent the job to be run. This attribute is - dumped to a JSON file and copied to the cluster where `ray_main.py` - uses it to initiate the job. - verbose: - A boolean value indicating whether to enable verbose output. - """ - - app_id: str - cluster_config_file: str - cluster_name: Optional[str] = None - copy_scripts: bool = False - copy_script_dirs: bool = False - scripts: Set[str] = field(default_factory=set) - actors: List[RayActor] = field(default_factory=list) - verbose: bool = False - - -class RayScheduler(Scheduler): - def __init__(self, session_name: str) -> None: - super().__init__("ray", session_name) - - def run_opts(self) -> runopts: - opts = runopts() - opts.add( - "cluster_config_file", - type_=str, - required=True, - help="Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.", - ) - opts.add( - "cluster_name", - type_=str, - help="Override the configured cluster name.", - ) - opts.add( - "copy_scripts", - type_=bool, - default=False, - help="Copy the Python script(s) to the cluster.", - ) - opts.add( - "copy_script_dirs", - type_=bool, - default=False, - help="Copy the directories containing the Python scripts to the cluster.", - ) - opts.add( - "verbose", - type_=bool, - default=False, - help="Enable verbose output.", - ) - return opts - - def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str: - raise NotImplementedError() - - def _submit_dryrun( - self, app: AppDef, cfg: Mapping[str, CfgVal] - ) -> AppDryRunInfo[RayJob]: - app_id = make_unique(app.name) - - cluster_cfg = cfg.get("cluster_config_file") - if not isinstance(cluster_cfg, str) or not os.path.isfile(cluster_cfg): - raise ValueError("The cluster configuration file must be a YAML file.") - - job: RayJob = RayJob(app_id, cluster_cfg) - - # pyre-ignore[24]: Generic type `type` expects 1 type parameter - def set_job_attr(cfg_name: str, cfg_type: Type) -> None: - cfg_value = cfg.get(cfg_name) - if cfg_value is None: - return - - if not isinstance(cfg_value, cfg_type): - raise TypeError( - f"The configuration value '{cfg_name}' must be of type {cfg_type.__name__}." + # Encode job submission client in job_id + return f"{ip_address}-{job_id}" + + def _submit_dryrun( + self, app: AppDef, cfg: Mapping[str, CfgVal] + ) -> AppDryRunInfo[RayJob]: + app_id = make_unique(app.name) + + cluster_cfg = cfg.get("cluster_config_file") + if cluster_cfg: + if not isinstance(cluster_cfg, str) or not os.path.isfile(cluster_cfg): + raise ValueError( + "The cluster configuration file must be a YAML file." + ) + job: RayJob = RayJob(app_id, cluster_cfg) + + else: # pragma: no cover + dashboard_address = cfg.get("dashboard_address") + job: RayJob = RayJob( + app_id=app_id, dashboard_address=dashboard_address # pyre-ignore[6] ) - setattr(job, cfg_name, cfg_value) - - set_job_attr("cluster_name", str) - set_job_attr("copy_scripts", bool) - set_job_attr("copy_script_dirs", bool) - set_job_attr("verbose", bool) - - for role in app.roles: - # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders - # in arguments and environment variables. - role = macros.Values( - img_root=role.image, app_id=app_id, replica_id="${rank}" - ).apply(role) - - actor = RayActor( - name=role.name, - command=" ".join([role.entrypoint] + role.args), - env=role.env, - num_replicas=max(1, role.num_replicas), - num_cpus=max(1, role.resource.cpu), - num_gpus=max(0, role.resource.gpu), - ) + # pyre-ignore[24]: Generic type `type` expects 1 type parameter + def set_job_attr(cfg_name: str, cfg_type: Type) -> None: + cfg_value = cfg.get(cfg_name) + if cfg_value is None: + return + + if not isinstance(cfg_value, cfg_type): + raise TypeError( + f"The configuration value '{cfg_name}' must be of type {cfg_type.__name__}." + ) + + setattr(job, cfg_name, cfg_value) + + set_job_attr("cluster_name", str) + set_job_attr("working_dir", str) + + for role in app.roles: + # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders + # in arguments and environment variables. + role = macros.Values( + img_root=role.image, app_id=app_id, replica_id="${rank}" + ).apply(role) + + actor = RayActor( + name=role.name, + command=" ".join([role.entrypoint] + role.args), + env=role.env, + num_replicas=max(1, role.num_replicas), + num_cpus=max(1, role.resource.cpu), + num_gpus=max(0, role.resource.gpu), + ) - job.actors.append(actor) + job.actors.append(actor) - if job.copy_scripts or job.copy_script_dirs: - # Find out the actual user script. - for arg in role.args: - if arg.endswith(".py"): - job.scripts.add(arg) + return AppDryRunInfo(job, repr) - return AppDryRunInfo(job, repr) + def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None: + if scheduler != "ray": + raise ValueError( + f"An unknown scheduler backend '{scheduler}' has been passed to the Ray scheduler." + ) - def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None: - if scheduler != "ray": - raise ValueError( - f"An unknown scheduler backend '{scheduler}' has been passed to the Ray scheduler." + if app.metadata: + _logger.warning("The Ray scheduler does not use metadata information.") + + for role in app.roles: + if role.resource.capabilities: + _logger.warning( + "The Ray scheduler does not support custom resource capabilities." + ) + break + + for role in app.roles: + if role.port_map: + _logger.warning("The Ray scheduler does not support port mapping.") + break + + def wait_until_finish(self, app_id: str, timeout: int = 30) -> None: + addr, app_id = app_id.split("-") + + client = JobSubmissionClient(f"http://{addr}") + start = time.time() + while time.time() - start <= timeout: + status_info = client.get_job_status(app_id) + status = status_info.status + if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: + break + time.sleep(1) + + def _cancel_existing(self, app_id: str) -> None: # pragma: no cover + addr, app_id = app_id.split("-") + client = JobSubmissionClient(f"http://{addr}") + client.stop_job(app_id) + + def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + addr, app_id = app_id.split("-") + client = JobSubmissionClient(f"http://{addr}") + status = client.get_job_status(app_id).status + _logger.debug(f"Status is {status}") + status = _ray_status_to_torchx_appstate[status] + roles = [Role(name="ray", num_replicas=1, image="")] + roles_statuses = [ + RoleStatus( + role="ray", + replicas=[ + ReplicaStatus(id=0, role="ray", hostname="", state=status) + ], + ) + ] + return DescribeAppResponse( + app_id=app_id, state=status, roles_statuses=roles_statuses, roles=roles ) - if app.metadata: - _logger.warning("The Ray scheduler does not use metadata information.") + def log_iter( + self, + app_id: str, + role_name: Optional[str] = None, + k: int = 0, + regex: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + should_tail: bool = False, + streams: Optional[Stream] = None, + ) -> List[str]: + # TODO: support regex, tailing, streams etc.. + addr, app_id = app_id.split("-") + client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}") + logs: str = client.get_job_logs(app_id) + return logs.split("\n") + + def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler: + if not has_ray(): # pragma: no cover + raise RuntimeError( + "Ray is not installed in the current Python environment." + ) - for role in app.roles: - if role.resource.capabilities: - _logger.warning( - "The Ray scheduler does not support custom resource capabilities." - ) - break - - for role in app.roles: - if role.port_map: - _logger.warning("The Ray scheduler does not support port mapping.") - break - - def _cancel_existing(self, app_id: str) -> None: - raise NotImplementedError() - - def describe(self, app_id: str) -> Optional[DescribeAppResponse]: - raise NotImplementedError() - - def log_iter( - self, - app_id: str, - role_name: str, - k: int = 0, - regex: Optional[str] = None, - since: Optional[datetime] = None, - until: Optional[datetime] = None, - should_tail: bool = False, - streams: Optional[Stream] = None, - ) -> Iterable[str]: - raise NotImplementedError() - - -def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler: - if not has_ray(): - raise RuntimeError("Ray is not installed in the current Python environment.") - - return RayScheduler(session_name=session_name) + return RayScheduler(session_name=session_name) diff --git a/torchx/schedulers/test/ray_scheduler_test.py b/torchx/schedulers/test/ray_scheduler_test.py index f67f24584..4a683e9aa 100644 --- a/torchx/schedulers/test/ray_scheduler_test.py +++ b/torchx/schedulers/test/ray_scheduler_test.py @@ -4,33 +4,44 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import os from contextlib import contextmanager from dataclasses import dataclass -from typing import Any, Dict, Iterator, Type +from typing import Any, Iterator, Type, Optional, Dict, List, cast from unittest import TestCase from unittest.mock import patch -from torchx.schedulers.ray_scheduler import RayScheduler, _logger, has_ray +import ray +from torchx.schedulers import get_schedulers +from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse +from torchx.schedulers.ray import ray_driver +from torchx.schedulers.ray.ray_common import RayActor +from torchx.schedulers.ray_scheduler import ( + RayScheduler, + _logger, + has_ray, + RayJob, + serialize, +) from torchx.specs import AppDef, CfgVal, Resource, Role, runopts if has_ray(): - # TODO(aivanou): enable after 0.1.1 release - # class RaySchedulerRegistryTest(TestCase): - # def test_get_schedulers_returns_ray_scheduler(self) -> None: - # schedulers = get_schedulers("test_session") + class RaySchedulerRegistryTest(TestCase): + def test_get_schedulers_returns_ray_scheduler(self) -> None: + schedulers = get_schedulers("test_session") - # self.assertIn("ray", schedulers) + self.assertIn("ray", schedulers) - # scheduler = schedulers["ray"] + scheduler = schedulers["ray"] - # self.assertIsInstance(scheduler, RayScheduler) + self.assertIsInstance(scheduler, RayScheduler) - # ray_scheduler = cast(RayScheduler, scheduler) + ray_scheduler = cast(RayScheduler, scheduler) - # self.assertEqual(ray_scheduler.backend, "ray") - # self.assertEqual(ray_scheduler.session_name, "test_session") + self.assertEqual(ray_scheduler.backend, "ray") + self.assertEqual(ray_scheduler.session_name, "test_session") class RaySchedulerTest(TestCase): def setUp(self) -> None: @@ -60,9 +71,8 @@ def setUp(self) -> None: self._run_cfg: Dict[str, CfgVal] = { "cluster_config_file": "dummy_file", "cluster_name": "dummy_name", - "copy_scripts": True, - "copy_script_dirs": True, - "verbose": True, + "working_dir": None, + "requirements": None, } self._scheduler = RayScheduler("test_session") @@ -103,11 +113,11 @@ def assert_option(expected_opt: Option) -> None: self.assertEqual(opt.default, expected_opt.default) expected_opts = [ - Option("cluster_config_file", str, is_required=True), + Option("cluster_config_file", str, is_required=False), Option("cluster_name", str), - Option("copy_scripts", bool, default=False), - Option("copy_script_dirs", bool, default=False), - Option("verbose", bool, default=False), + Option("dashboard_address", str, default="127.0.0.1:8265"), + Option("working_dir", str, is_required=False), + Option("requirements", str, is_required=False), ] self.assertEqual(len(opts), len(expected_opts)) @@ -204,17 +214,6 @@ def _assert_config_value(self, name: str, value: Any, type_name: str) -> None: def test_submit_dryrun_raises_error_if_cluster_name_is_not_str(self) -> None: self._assert_config_value("cluster_name", 1, "str") - def test_submit_dryrun_raises_error_if_copy_scripts_is_not_bool(self) -> None: - self._assert_config_value("copy_scripts", "dummy_value", "bool") - - def test_submit_dryrun_raises_error_if_copy_script_dirs_is_not_bool( - self, - ) -> None: - self._assert_config_value("copy_script_dirs", "dummy_value", "bool") - - def test_submit_dryrun_raises_error_if_verbose_is_not_bool(self) -> None: - self._assert_config_value("verbose", "dummy_value", "bool") - def _assert_submit_dryrun_constructs_job_definition(self) -> None: run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) @@ -227,13 +226,6 @@ def _assert_submit_dryrun_constructs_job_definition(self) -> None: job.cluster_config_file, self._run_cfg.get("cluster_config_file") ) self.assertEqual(job.cluster_name, self._run_cfg.get("cluster_name")) - self.assertEqual( - job.copy_scripts, self._run_cfg.get("copy_scripts") or False - ) - self.assertEqual( - job.copy_script_dirs, self._run_cfg.get("copy_script_dirs") or False - ) - self.assertEqual(job.verbose, self._run_cfg.get("verbose") or False) for actor, role in zip(job.actors, self._app_def.roles): self.assertEqual(actor.name, role.name) @@ -242,19 +234,14 @@ def _assert_submit_dryrun_constructs_job_definition(self) -> None: self.assertEqual(actor.num_replicas, max(1, role.num_replicas)) self.assertEqual(actor.num_cpus, max(1, role.resource.cpu)) self.assertEqual(actor.num_gpus, max(0, role.resource.gpu)) - - if job.copy_scripts: - self.assertEqual(job.scripts, set(self._scripts)) - else: self.assertEqual(job.scripts, set()) def test_submit_dryrun_constructs_job_definition(self) -> None: self._assert_submit_dryrun_constructs_job_definition() self._run_cfg["cluster_name"] = None - self._run_cfg["copy_scripts"] = False - self._run_cfg["copy_script_dirs"] = False - self._run_cfg["verbose"] = None + self._run_cfg["working_dir"] = None + self._run_cfg["requirements"] = None self._assert_submit_dryrun_constructs_job_definition() @@ -266,3 +253,109 @@ def test_submit_dryrun_constructs_actor_command(self) -> None: self.assertEqual( job.actors[0].command, "dummy_entrypoint1 arg1 dummy1.py arg2" ) + + class RayClusterSetup: + _instance = None # pyre-ignore[4] + + def __new__(cls): # pyre-ignore[3] + if cls._instance is None: + cls._instance = super(RayClusterSetup, cls).__new__(cls) + ray.shutdown() + start_status: int = os.system("ray start --head") + if start_status != 0: + raise AssertionError( + "ray start --head command has failed. Cannot proceed with running tests" + ) + ray.init(address="auto", ignore_reinit_error=True) + cls.reference_count: int = 2 + return cls._instance + + def decrement_reference(cls) -> None: + cls.reference_count = cls.reference_count - 1 + if cls.reference_count == 0: + cls.teardown_ray_cluster() + + def teardown_ray_cluster(cls) -> None: + ray.shutdown() + + class RayDriverTest(TestCase): + def test_command_actor_setup(self) -> None: + ray_cluster_setup = RayClusterSetup() + + actor1 = RayActor( + name="test_actor_1", command="python 1 2", env={"fake": "1"} + ) + actor2 = RayActor( + name="test_actor_2", command="python 3 4", env={"fake": "2"} + ) + actors = [actor1, actor2] + current_dir = os.path.dirname(os.path.realpath(__file__)) + serialize(actors, current_dir) + + loaded_actor = ray_driver.load_actor_json( + os.path.join(current_dir, "actors.json") + ) + assert loaded_actor == actors + + pgs = ray_driver.create_placement_groups(actors) + assert len(pgs) >= 1 + + command_actors = ray_driver.create_command_actors(actors, pgs) + assert len(command_actors) >= 1 + ray_cluster_setup.decrement_reference() + + class RayIntegrationTest(TestCase): + def test_ray_cluster(self) -> None: + ray_cluster_setup = RayClusterSetup() + ray_scheduler = self.setup_ray_cluster() + assert ray.is_initialized() is True + + job_id = self.schedule_ray_job(ray_scheduler) + assert job_id is not None + + ray_scheduler.wait_until_finish(job_id, 100) + + logs = self.check_logs(ray_scheduler=ray_scheduler, app_id=job_id) + print(logs) + assert logs is not None + + status = self.describe(ray_scheduler, job_id) + assert status is not None + + ray_cluster_setup.decrement_reference() + + def setup_ray_cluster(self) -> RayScheduler: + ray_scheduler = RayScheduler(session_name="test") + return ray_scheduler + + def schedule_ray_job( + self, ray_scheduler: RayScheduler, app_id: str = "123" + ) -> str: + current_dir = os.path.dirname(os.path.realpath(__file__)) + actor = RayActor( + name="ddp", + num_cpus=2, + num_replicas=2, + command=os.path.join(current_dir, "train.py"), + ) + + ray_job = RayJob( + app_id=app_id, + dashboard_address="127.0.0.1:8265", + scripts=set([os.path.join(current_dir, "train.py")]), + actors=[actor], + ) + app_info = AppDryRunInfo(ray_job, repr) + job_id = ray_scheduler.schedule(app_info) + return job_id + + def describe( + self, ray_scheduler: RayScheduler, app_id: str = "123" + ) -> Optional[DescribeAppResponse]: + return ray_scheduler.describe(app_id) + + def check_logs( + self, ray_scheduler: RayScheduler, app_id: str = "123" + ) -> List[str]: + logs: List[str] = ray_scheduler.log_iter(app_id=app_id) + return logs diff --git a/torchx/schedulers/test/train.py b/torchx/schedulers/test/train.py new file mode 100644 index 000000000..38c650625 --- /dev/null +++ b/torchx/schedulers/test/train.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import os + +import torch +import torch.nn.functional as F +from torch.distributed import init_process_group, all_reduce, get_rank, get_world_size + + +def compute_world_size() -> int: + + rank = int(os.getenv("RANK")) # pyre-ignore[6] + world_size = int(os.getenv("WORLD_SIZE")) # pyre-ignore[6] + master_port = int(os.getenv("MASTER_PORT")) # pyre-ignore[6] + master_addr = os.getenv("MASTER_ADDR") + backend = "gloo" + + print(f"initializing `{backend}` process group") + init_process_group( # pyre-ignore[16] + backend=backend, + init_method=f"tcp://{master_addr}:{master_port}", + rank=rank, + world_size=world_size, + ) + print("successfully initialized process group") + + rank = get_rank() # pyre-ignore[16] + world_size = get_world_size() # pyre-ignore[16] + + t = F.one_hot(torch.tensor(rank), num_classes=world_size) + all_reduce(t) # pyre-ignore[16] + computed_world_size = int(torch.sum(t).item()) + print( + f"rank: {rank}, actual world_size: {world_size}, computed world_size: {computed_world_size}" + ) + return computed_world_size + + +def main() -> None: + compute_world_size() + + +if __name__ == "__main__": + main()