Skip to content

[job submission] Add stop API + subprocess cleanup #19860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
552a147
initial stop job coro with async job actor
jiaodong Oct 29, 2021
5a27f97
travis
jiaodong Oct 29, 2021
d9c271c
Merge branch 'master' into stop_job
jiaodong Oct 29, 2021
cf365c4
tests passing
jiaodong Oct 29, 2021
d02e2d6
travis'
jiaodong Oct 29, 2021
5097647
fix http server tests with status polling
jiaodong Oct 29, 2021
4a00bac
travis
jiaodong Oct 29, 2021
ddeeb51
remove actor level status with passing tests
jiaodong Oct 30, 2021
2d11d84
[tests passed] GCS based status handling, failed pending runtime env …
jiaodong Oct 30, 2021
d1ff8f8
improvements and added new tests
jiaodong Oct 30, 2021
b719c72
Merge branch 'master' into stop_job
jiaodong Oct 30, 2021
dc9ec41
rebase and remove serve logger
jiaodong Oct 30, 2021
0d961ec
much simpler version with asyncio coros
jiaodong Nov 1, 2021
383ab20
Merge branch 'master' into stop_job
jiaodong Nov 1, 2021
e6e9b1d
passed all job manager and http tests
jiaodong Nov 2, 2021
de7b5ad
http server polishing
jiaodong Nov 2, 2021
a863627
added actor - subprocess fate sharing with all tests passed
jiaodong Nov 2, 2021
0bc1b69
Merge branch 'master' into stop_job
jiaodong Nov 2, 2021
372a07f
address some comments
jiaodong Nov 2, 2021
0232e1e
Merge branch 'master' of https://github.com/ray-project/ray into jiao…
edoakes Nov 2, 2021
60c7855
fix
edoakes Nov 2, 2021
5b13eba
move job manager tests to ray/tests and add to BUILD file
jiaodong Nov 3, 2021
3546a25
fix paths
edoakes Nov 3, 2021
169120d
Merge branch 'master' of https://github.com/ray-project/ray into stop…
edoakes Nov 3, 2021
366e2af
fix indentation
edoakes Nov 3, 2021
0b3c59b
remove timeout logic
edoakes Nov 3, 2021
8f60718
Revert "remove timeout logic"
edoakes Nov 3, 2021
de72c4c
[CI] Run Java CI on Mac (#19757)
jjyao Nov 3, 2021
2354312
fix BUILD glob rule and adding logger.error for debugging
jiaodong Nov 3, 2021
cbd05ce
Fix child watcher does not have a loop attached error on CI by fallin…
jiaodong Nov 3, 2021
088d0b2
travis'
jiaodong Nov 3, 2021
331a661
remove job actor starting timeout
jiaodong Nov 3, 2021
070d176
Merge branch 'master' into stop_job
jiaodong Nov 3, 2021
23e121e
remove logger lines using startup timeout constant, unblock test fail…
jiaodong Nov 4, 2021
442af71
skip on windows
edoakes Nov 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/travis/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ test_python() {
-python/ray/serve:test_get_deployment # address violation
-python/ray/tests:test_global_gc
-python/ray/tests:test_job
-python/ray/tests:test_job_manager
-python/ray/tests:test_memstat
-python/ray/tests:test_metrics
-python/ray/tests:test_metrics_agent # timeout
Expand Down
61 changes: 17 additions & 44 deletions dashboard/modules/job/data_types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from enum import Enum
from typing import Any, Dict

try:
from pydantic import BaseModel
except ImportError:
# Lazy import without breaking class def
BaseModel = object
Copy link
Member Author

@jiaodong jiaodong Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exiting tests actually won't pass if user don't have pydantic installed since python object and BaseModel have different APIs to create and serialize objects. Because we can't add pydantic to oss yet due to its size (30MB), this PR falls back to dataclass.

from dataclasses import dataclass


class JobStatus(str, Enum):
Expand All @@ -19,67 +14,45 @@ def __str__(self):
FAILED = "FAILED"


class JobSpec(BaseModel):
# Dict to setup execution environment, better to have schema for this
runtime_env: Dict[str, Any]
# Command to start execution, ex: "python script.py"
entrypoint: str
# Metadata to pass in to configure job behavior or use as tags
# Required by Anyscale product and already supported in Ray drivers
metadata: Dict[str, str]
# Likely there will be more fields needed later on for different apps
# but we should keep it minimal and delegate policies to job manager


# ==== Get Package ====


class GetPackageRequest(BaseModel):
package_uri: str


class GetPackageResponse(BaseModel):
@dataclass
class GetPackageResponse:
package_exists: bool


# ==== Upload Package ====


class UploadPackageRequest(BaseModel):
package_uri: str
encoded_package_bytes: str


# ==== Job Submit ====


class JobSubmitRequest(BaseModel):
job_spec: JobSpec
@dataclass
class JobSubmitRequest:
# Dict to setup execution environment.
runtime_env: Dict[str, Any]
# Command to start execution, ex: "python script.py"
entrypoint: str
# Metadata to pass in to the JobConfig.
metadata: Dict[str, str]


class JobSubmitResponse(BaseModel):
@dataclass
class JobSubmitResponse:
job_id: str


# ==== Job Status ====


class JobStatusRequest(BaseModel):
job_id: str


class JobStatusResponse(BaseModel):
@dataclass
class JobStatusResponse:
job_status: JobStatus


# ==== Job Logs ====


class JobLogsRequest(BaseModel):
job_id: str


# TODO(jiaodong): Support log streaming #19415
class JobLogsResponse(BaseModel):
@dataclass
class JobLogsResponse:
stdout: str
stderr: str
88 changes: 34 additions & 54 deletions dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import aiohttp.web
from base64 import b64decode
from functools import wraps
import logging
from typing import Callable
import json
import dataclasses

import ray
import ray.dashboard.utils as dashboard_utils
from ray._private.job_manager import JobManager
from ray._private.runtime_env.packaging import (package_exists,
upload_package_to_gcs)
from ray.dashboard.modules.job.data_types import (
GetPackageRequest, GetPackageResponse, UploadPackageRequest, JobStatus,
JobSubmitRequest, JobSubmitResponse, JobStatusRequest, JobStatusResponse,
JobLogsRequest, JobLogsResponse)
GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse,
JobStatusResponse, JobLogsResponse)

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
Expand Down Expand Up @@ -40,81 +40,61 @@ def __init__(self, dashboard_head):
@_ensure_ray_initialized
async def get_package(self,
req: aiohttp.web.Request) -> aiohttp.web.Response:
req_data = await req.json()
package_uri = GetPackageRequest(**req_data).package_uri
already_exists = package_exists(package_uri)
exists_str = "exists" if already_exists else "does not exist"
return dashboard_utils.rest_response(
success=True,
convert_google_style=False,
data=GetPackageResponse(package_exists=already_exists).dict(),
message=f"Package {package_uri} {exists_str}.")
package_uri = req.query["package_uri"]
resp = GetPackageResponse(package_exists=package_exists(package_uri))
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")

@routes.put("/package")
@_ensure_ray_initialized
async def upload_package(self,
req: aiohttp.web.Request) -> aiohttp.web.Response:
req_data = await req.json()
upload_req = UploadPackageRequest(**req_data)
package_uri = upload_req.package_uri
async def upload_package(self, req: aiohttp.web.Request):
package_uri = req.query["package_uri"]
logger.info(f"Uploading package {package_uri} to the GCS.")
upload_package_to_gcs(package_uri,
b64decode(upload_req.encoded_package_bytes))
return dashboard_utils.rest_response(
success=True,
convert_google_style=False,
message=f"Successfully uploaded {package_uri}.")
upload_package_to_gcs(package_uri, await req.read())

return aiohttp.web.Response()

@routes.post("/submit")
@_ensure_ray_initialized
async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
req_data = await req.json()
submit_request = JobSubmitRequest(**req_data)
# TODO: (jiaodong) Validate if job request is valid without using
# pydantic.
submit_request = JobSubmitRequest(**(await req.json()))
job_id = self._job_manager.submit_job(
submit_request.job_spec.entrypoint,
runtime_env=submit_request.job_spec.runtime_env,
metadata=submit_request.job_spec.metadata)
entrypoint=submit_request.entrypoint,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata)

resp = JobSubmitResponse(job_id=job_id)
return dashboard_utils.rest_response(
success=True,
convert_google_style=False,
data=resp.dict(),
message=f"Submitted job {job_id}.")
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")

@routes.get("/status")
@_ensure_ray_initialized
async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
req_data = dict(await req.json())
status_request = JobStatusRequest(**req_data)
job_id = req.query["job_id"]
status: JobStatus = self._job_manager.get_job_status(job_id)

status: JobStatus = self._job_manager.get_job_status(
status_request.job_id)
resp = JobStatusResponse(job_status=status)
return dashboard_utils.rest_response(
success=True,
convert_google_style=False,
data=resp.dict(),
message=f"Queried status for job {status_request.job_id}")
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")

@routes.get("/logs")
@_ensure_ray_initialized
async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
req_data = dict(await req.json())
logs_request = JobLogsRequest(**req_data)

stdout: bytes = self._job_manager.get_job_stdout(logs_request.job_id)
stderr: bytes = self._job_manager.get_job_stderr(logs_request.job_id)
job_id = req.query["job_id"]
stdout: bytes = self._job_manager.get_job_stdout(job_id)
stderr: bytes = self._job_manager.get_job_stderr(job_id)

# TODO(jiaodong): Support log streaming #19415
resp = JobLogsResponse(
stdout=stdout.decode("utf-8"), stderr=stderr.decode("utf-8"))

return dashboard_utils.rest_response(
success=True,
convert_google_style=False,
data=resp.dict(),
message=f"Logs returned for job {logs_request.job_id}")
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")

async def run(self, server):
if not self._job_manager:
Expand Down
83 changes: 43 additions & 40 deletions dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
from base64 import b64encode
import dataclasses
import logging
from pathlib import Path
import tempfile
from typing import Any, Dict, List, Optional, Tuple

try:
from pydantic import BaseModel
from pydantic.main import ModelMetaclass
except ImportError:
BaseModel = object
ModelMetaclass = object

import requests

from ray._private.runtime_env.packaging import (
create_package, get_uri_for_directory, parse_uri)
from ray._private.job_manager import JobStatus
from ray.dashboard.modules.job.data_types import (
GetPackageRequest, GetPackageResponse, UploadPackageRequest, JobSpec,
JobSubmitRequest, JobSubmitResponse, JobStatusRequest, JobStatusResponse,
JobLogsRequest, JobLogsResponse)
GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStatusResponse,
JobLogsResponse)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand All @@ -37,32 +29,33 @@ def _test_connection(self):
raise ConnectionError(
f"Failed to connect to Ray at address: {self._address}.")

def _do_request(
self,
method: str,
endpoint: str,
data: BaseModel,
response_type: Optional[ModelMetaclass] = None) -> Dict[Any, Any]:
def _do_request(self,
method: str,
endpoint: str,
*,
data: Optional[bytes] = None,
json_data: Optional[dict] = None,
params: Optional[dict] = None,
response_type: Optional[type] = None) -> Optional[object]:
url = f"{self._address}/{endpoint}"
json_payload = data.dict()
logger.debug(f"Sending request to {url} with payload {json_payload}.")
r = requests.request(method, url, json=json_payload)
r.raise_for_status()

response_json = r.json()
if not response_json["result"]: # Indicates failure.
raise Exception(response_json["msg"])
logger.info(f"Sending request to {url} with json: {json_data}.")
r = requests.request(
method, url, data=data, json=json_data, params=params)

r.raise_for_status()
if response_type is None:
return None
else:
# Dashboard "framework" returns double-nested "data" field...
return response_type(**response_json["data"]["data"])
response = r.json()
logger.info(f"Got response: {response}.")
return response_type(**response)

def _package_exists(self, package_uri: str) -> bool:
req = GetPackageRequest(package_uri=package_uri)
resp = self._do_request(
"GET", "package", req, response_type=GetPackageResponse)
"GET",
"package",
params={"package_uri": package_uri},
response_type=GetPackageResponse)
return resp.package_exists

def _upload_package(self,
Expand All @@ -78,10 +71,11 @@ def _upload_package(self,
package_file,
include_parent_dir=include_parent_dir,
excludes=excludes)
req = UploadPackageRequest(
package_uri=package_uri,
encoded_package_bytes=b64encode(package_file.read_bytes()))
self._do_request("PUT", "package", req)
self._do_request(
"PUT",
"package",
data=package_file.read_bytes(),
params={"package_uri": package_uri})
package_file.unlink()

def _upload_package_if_needed(self,
Expand Down Expand Up @@ -122,18 +116,27 @@ def submit_job(self,
metadata = metadata or {}

self._upload_working_dir_if_needed(runtime_env)
job_spec = JobSpec(
req = JobSubmitRequest(
entrypoint=entrypoint, runtime_env=runtime_env, metadata=metadata)
req = JobSubmitRequest(job_spec=job_spec)
resp = self._do_request("POST", "submit", req, JobSubmitResponse)
resp = self._do_request(
"POST",
"submit",
json_data=dataclasses.asdict(req),
response_type=JobSubmitResponse)
return resp.job_id

def get_job_status(self, job_id: str) -> JobStatus:
req = JobStatusRequest(job_id=job_id)
resp = self._do_request("GET", "status", req, JobStatusResponse)
resp = self._do_request(
"GET",
"status",
params={"job_id": job_id},
response_type=JobStatusResponse)
return resp.job_status

def get_job_logs(self, job_id: str) -> Tuple[str, str]:
req = JobLogsRequest(job_id=job_id)
resp = self._do_request("GET", "logs", req, JobLogsResponse)
resp = self._do_request(
"GET",
"logs",
params={"job_id": job_id},
response_type=JobLogsResponse)
return resp.stdout, resp.stderr
1 change: 1 addition & 0 deletions dashboard/modules/job/tests/test_http_job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.dashboard.modules.job.sdk import JobSubmissionClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@pytest.fixture
Expand Down
Loading