Skip to content

Ray scheduler driver and job api #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ kfp==1.8.9
moto==2.2.12
pyre-extensions==0.0.21
pytorch-lightning==1.5.6
ray==1.9.0
s3fs==2021.10.1
ray[default]==1.9.2
torch-model-archiver==0.4.2
torch==1.10.0
torchserve==0.4.2
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Works With
schedulers/local
schedulers/kubernetes
schedulers/slurm
schedulers/ray

.. _Pipelines:
.. toctree::
Expand Down
8 changes: 8 additions & 0 deletions docs/source/schedulers/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Ray
=================

.. automodule:: torchx.schedulers.ray_scheduler
.. currentmodule:: torchx.schedulers.ray_scheduler

.. autoclass:: RayScheduler
:members:
7 changes: 7 additions & 0 deletions scripts/component_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ def get_k8s_sched_info(image: str) -> SchedulerInfo:
return SchedulerInfo(name="kubernetes", image=image, cfg=cfg)


def get_ray_sched_info(image: str) -> SchedulerInfo:
cfg = {
"namespace": "torchx-dev",
}
return SchedulerInfo(name="ray", image=image, cfg=cfg)


def get_local_cwd_sched_info(image: str) -> SchedulerInfo:
return SchedulerInfo(name="local_cwd", image=image)

Expand Down
2 changes: 1 addition & 1 deletion torchx/cli/cmd_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


def validate(job_identifier: str) -> None:
if not re.match(r"^\w+://[^/.]*/[^/.]+(/[^/.]+(/(\d+,?)+)?)?$", job_identifier):
if not re.match(r"^\w+://[^/]*/[^/]+(/[^/]+(/(\d+,?)+)?)?$", job_identifier):
logger.error(
f"{job_identifier} is not of the form {ID_FORMAT}",
)
Expand Down
3 changes: 3 additions & 0 deletions torchx/components/test/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def test_sh(self) -> None:
def test_python(self) -> None:
self.validate(utils, "python")

def test_binary(self) -> None:
self.validate(utils, "binary")

def test_touch(self) -> None:
self.validate(utils, "touch")

Expand Down
30 changes: 30 additions & 0 deletions torchx/components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,36 @@ def python(
)


def binary(
*args: str,
entrypoint: str,
name: str = "torchx_utils_python",
num_replicas: int = 1,
) -> specs.AppDef:
"""
Test component

Args:
args: arguments passed to the program in sys.argv[1:] (ignored with `--c`)
name: name of the job
num_replicas: number of copies to run (each on its own container)
:return:
"""
return specs.AppDef(
name=name,
roles=[
specs.Role(
name="binary",
image="<NONE>",
entrypoint=entrypoint,
num_replicas=num_replicas,
resource=specs.Resource(cpu=2, gpu=0, memMB=4096),
args=[*args],
)
],
)


def copy(src: str, dst: str, image: str = torchx.IMAGE) -> specs.AppDef:
"""
copy copies the file from src to dst. src and dst can be any valid fsspec
Expand Down
142 changes: 142 additions & 0 deletions torchx/examples/apps/aws/ray/ray_cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: gpu-docker

min_workers: 1
max_workers: 4

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "rayproject/ray-ml:latest-gpu"
# image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull
container_name: "ray_nvidia_docker" # e.g. ray_docker


# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes are currently spread between zones by a round-robin approach,
# however this implementation detail should not be relied upon.
availability_zone: us-west-2a,us-west-2b
security_group:
GroupName: dashboard_group
IpPermissions:
- FromPort: 20002
ToPort: 20002
IpProtocol: TCP
IpRanges:
- CidrIp: 0.0.0.0/0


# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
# GPU head node.
ray.head.gpu:
# worker_image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
node_config:
InstanceType: p2.xlarge
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100
# Additional options in the boto docs.
# CPU workers.
ray.worker.default:
# Override global docker setting.
# This node type will run a CPU image,
# rather than the GPU image specified in the global docker settings.
docker:
worker_image: "rayproject/ray-ml:latest-cpu"
# The minimum number of nodes of this type to launch.
# This number should be >= 0.
min_workers: 1
# The maximum number of workers nodes of this type to launch.
# This takes precedence over min_workers.
max_workers: 2
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
node_config:
InstanceType: m5.large
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE
# Additional options in the boto docs.

# Specify the node type of the head node (as configured above).
head_node_type: ray.head.gpu

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}

# List of shell commands to run to set up nodes.
# NOTE: rayproject/ray:latest has ray latest bundled
setup_commands: []
# - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl
# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --dashboard-port 20002 --dashboard-host=0.0.0.0 --include-dashboard True --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
20 changes: 19 additions & 1 deletion torchx/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from typing import Dict
from typing import Dict, Optional

import torchx.schedulers.docker_scheduler as docker_scheduler
import torchx.schedulers.kubernetes_scheduler as kubernetes_scheduler
Expand All @@ -22,20 +22,38 @@ def __call__(self, session_name: str, **kwargs: object) -> Scheduler:
...


def try_get_ray_scheduler() -> Optional[SchedulerFactory]:
try:
from torchx.schedulers.ray_scheduler import _has_ray # @manual

if _has_ray:
import torchx.schedulers.ray_scheduler as ray_scheduler # @manual

return ray_scheduler.create_scheduler

except ImportError: # pragma: no cover
return None


def get_scheduler_factories() -> Dict[str, SchedulerFactory]:
"""
get_scheduler_factories returns all the available schedulers names and the
method to instantiate them.

The first scheduler in the dictionary is used as the default scheduler.
"""

default_schedulers: Dict[str, SchedulerFactory] = {
"local_docker": docker_scheduler.create_scheduler,
"local_cwd": local_scheduler.create_cwd_scheduler,
"slurm": slurm_scheduler.create_scheduler,
"kubernetes": kubernetes_scheduler.create_scheduler,
}

ray_scheduler_creator = try_get_ray_scheduler()
if ray_scheduler_creator:
default_schedulers["ray"] = ray_scheduler_creator

return load_group(
"torchx.schedulers",
default=default_schedulers,
Expand Down
6 changes: 6 additions & 0 deletions torchx/schedulers/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
36 changes: 36 additions & 0 deletions torchx/schedulers/ray/ray_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from dataclasses import dataclass, field
from typing import Dict


@dataclass
class RayActor:
"""Describes an actor (a.k.a. role in TorchX terms).

Attributes:
name:
The name of the actor.
command:
The command that the actor should run as a subprocess.
env:
The environment variables to set before executing the command.
num_replicas:
The number of replicas (i.e. Ray actors) to run.
num_cpus:
The number of CPUs to allocate.
num_gpus:
The number of GPUs to allocate.
"""

name: str
command: str
env: Dict[str, str] = field(default_factory=dict)
num_replicas: int = 1
num_cpus: int = 1
num_gpus: int = 0
# TODO: memory_size, max_retries, retry_policy
Loading