Skip to content

Commit 3240111

Browse files
d4l3kfacebook-github-bot
authored andcommitted
component_integration_tests: add aws_batch, ray + more interpretable (#403)
Summary: This adds aws_batch and ray to the integration tests and refactors them to run per scheduler in parallel to make it clearer what's running. Pull Request resolved: #403 Test Plan: CI + pyre Reviewed By: kiukchung Differential Revision: D34568682 Pulled By: d4l3k fbshipit-source-id: e2d18fc8b5f654fab994fafec46df5943da30c3f
1 parent 571fd78 commit 3240111

File tree

8 files changed

+107
-62
lines changed

8 files changed

+107
-62
lines changed

.github/workflows/components-integration-tests.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ on:
99
jobs:
1010
components-launch:
1111
runs-on: ubuntu-18.04
12+
strategy:
13+
matrix:
14+
include:
15+
- scheduler: "aws_batch"
16+
- scheduler: "kubernetes"
17+
- scheduler: "local_cwd"
18+
- scheduler: "local_docker"
19+
- scheduler: "ray"
20+
fail-fast: false
1221
permissions:
1322
id-token: write
1423
contents: read
@@ -48,8 +57,12 @@ jobs:
4857
set -eux
4958
pip install -r dev-requirements.txt
5059
pip install -e .[kubernetes]
60+
61+
- name: Start Ray
62+
if: ${{ matrix.scheduler == 'ray' }}
63+
run: ray start --head
5164
- name: Run Components Integration Tests
5265
env:
5366
INTEGRATION_TEST_STORAGE: ${{ secrets.INTEGRATION_TEST_STORAGE }}
5467
CONTAINER_REPO: ${{ secrets.CONTAINER_REPO }}
55-
run: scripts/component_integration_tests.py
68+
run: scripts/component_integration_tests.py --scheduler ${{ matrix.scheduler }}

scripts/component_integration_tests.py

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"""
99
Kubernetes integration tests.
1010
"""
11+
import argparse
1112
import os
1213

1314
import example_app_defs as examples_app_defs_providers
@@ -17,6 +18,7 @@
1718
IntegComponentTest,
1819
SchedulerInfo,
1920
)
21+
from torchx.schedulers import get_scheduler_factories
2022

2123

2224
# pyre-ignore-all-errors[21] # Cannot find module utils
@@ -44,54 +46,68 @@ def get_ray_sched_info(image: str) -> SchedulerInfo:
4446
return SchedulerInfo(name="ray", image=image, cfg=cfg)
4547

4648

47-
def get_local_cwd_sched_info(image: str) -> SchedulerInfo:
48-
return SchedulerInfo(name="local_cwd", image=image)
49-
50-
51-
def get_local_docker_sched_info(image: str) -> SchedulerInfo:
52-
return SchedulerInfo(name="local_docker", image=image)
49+
def argparser() -> argparse.ArgumentParser:
50+
parser = argparse.ArgumentParser(description="Process some integers.")
51+
choices = list(get_scheduler_factories().keys())
52+
parser.add_argument("--scheduler", required=True, choices=choices)
53+
return parser
5354

5455

5556
def main() -> None:
57+
args = argparser().parse_args()
58+
scheduler = args.scheduler
59+
5660
print("Starting components integration tests")
5761
torchx_image = "dummy_image"
5862
dryrun: bool = False
59-
try:
60-
build = build_and_push_image()
61-
torchx_image = build.torchx_image
62-
except MissingEnvError:
63-
dryrun = True
64-
print("Skip runnig tests, executed only docker buid step")
65-
test_suite = IntegComponentTest(timeout=600) # 10 minutes
66-
test_suite.run_components(
67-
component_provider,
68-
scheduler_infos=[
69-
get_local_cwd_sched_info(os.getcwd()),
70-
get_k8s_sched_info(torchx_image),
71-
],
72-
dryrun=dryrun,
73-
)
74-
75-
# Run components on `local_docker` scheduler in sequence due to
76-
# docker APIs are not atomic. Some of the APIs, e.g. `create_network`
77-
# cause a race condition, making several networks with the same name to be created.
78-
test_suite.run_components(
79-
component_provider,
80-
scheduler_infos=[
81-
get_local_docker_sched_info(torchx_image),
82-
],
83-
dryrun=dryrun,
84-
run_in_parallel=False,
85-
)
86-
87-
test_suite.run_components(
88-
examples_app_defs_providers,
89-
scheduler_infos=[
90-
get_local_docker_sched_info(torchx_image),
91-
get_k8s_sched_info(torchx_image),
92-
],
93-
dryrun=dryrun,
94-
)
63+
if scheduler in ("kubernetes", "local_docker", "aws_batch"):
64+
try:
65+
build = build_and_push_image()
66+
torchx_image = build.torchx_image
67+
except MissingEnvError:
68+
dryrun = True
69+
print("Skip running tests, executed only docker build step")
70+
test_suite: IntegComponentTest = IntegComponentTest(timeout=15 * 60) # 15 minutes
71+
72+
def run_components(info: SchedulerInfo) -> None:
73+
test_suite.run_components(
74+
component_provider,
75+
scheduler_infos=[info],
76+
dryrun=dryrun,
77+
)
78+
79+
def run_examples(info: SchedulerInfo) -> None:
80+
test_suite.run_components(
81+
examples_app_defs_providers,
82+
scheduler_infos=[info],
83+
dryrun=dryrun,
84+
)
85+
86+
if scheduler == "kubernetes":
87+
info = get_k8s_sched_info(torchx_image)
88+
run_components(info)
89+
run_examples(info)
90+
elif scheduler == "local_cwd":
91+
info = SchedulerInfo(name=scheduler, image=os.getcwd())
92+
run_components(info)
93+
elif scheduler == "local_docker":
94+
info = SchedulerInfo(name=scheduler, image=torchx_image)
95+
run_components(info)
96+
run_examples(info)
97+
elif scheduler == "aws_batch":
98+
info = SchedulerInfo(
99+
name=scheduler,
100+
image=torchx_image,
101+
cfg={
102+
"queue": "torchx",
103+
},
104+
)
105+
run_components(info)
106+
elif scheduler == "ray":
107+
info = get_ray_sched_info(torchx_image)
108+
run_components(info)
109+
else:
110+
raise ValueError(f"component tests missing support for {scheduler}")
95111

96112

97113
if __name__ == "__main__":

torchx/components/component_test_base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ def run_appdef_on_scheduler(
5757
app_status = runner.wait(app_handle)
5858
print(f"Final status: {app_status}")
5959
if none_throws(app_status).state != AppState.SUCCEEDED:
60+
for line in runner.log_lines(
61+
app_handle, role_name=app_def.roles[0].name
62+
):
63+
print(f"{app_handle}: {line}")
6064
raise AssertionError(
6165
f"App {app_handle} failed with status: {app_status}"
6266
)

torchx/components/dist.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def ddp(
142142
memMB: int = 1024,
143143
j: str = "1x2",
144144
env: Optional[Dict[str, str]] = None,
145-
max_restarts: Optional[int] = None,
145+
max_retries: int = 0,
146146
rdzv_backend: str = "c10d",
147147
rdzv_endpoint: Optional[str] = None,
148148
) -> specs.AppDef:
@@ -167,7 +167,7 @@ def ddp(
167167
h: a registered named resource (if specified takes precedence over cpu, gpu, memMB)
168168
j: {nnodes}x{nproc_per_node}, for gpu hosts, nproc_per_node must not exceed num gpus
169169
env: environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)
170-
max_restarts: the number of restarts allowed
170+
max_retries: the number of scheduler retries allowed
171171
rdzv_backend: rendezvous backend (only matters when nnodes > 1)
172172
rdzv_endpoint: rendezvous server endpoint (only matters when nnodes > 1), defaults to rank0 host for schedulers that support it
173173
"""
@@ -219,8 +219,6 @@ def ddp(
219219
"--nproc_per_node",
220220
str(nproc_per_node),
221221
]
222-
if max_restarts is not None:
223-
cmd += ["--max_restarts", str(max_restarts)]
224222
if script is not None:
225223
cmd += [script]
226224
elif m is not None:
@@ -240,6 +238,7 @@ def ddp(
240238
port_map={
241239
"c10d": 29500,
242240
},
241+
max_retries=max_retries,
243242
)
244243
],
245244
)

torchx/components/integration_tests/component_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,18 @@ def get_app_def(self) -> AppDef:
4040
script="torchx/components/integration_tests/test/dummy_app.py",
4141
name="ddp-trainer",
4242
image=self._image,
43+
cpu=1,
4344
j="2x2",
44-
max_restarts=3,
45+
max_retries=3,
4546
)
4647

4748

4849
class ServeComponentProvider(ComponentProvider):
4950
# TODO(aivanou): Remove dryrun and test e2e serve component+app
5051
def get_app_def(self) -> AppDef:
5152
return serve_components.torchserve(
52-
model_path="",
53-
management_api="",
53+
model_path="dummy_path",
54+
management_api="dummy_api",
5455
image=self._image,
5556
dryrun=True,
5657
)

torchx/schedulers/aws_batch_scheduler.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
for how to create a image repository.
3636
"""
3737

38+
import threading
3839
from dataclasses import dataclass
3940
from datetime import datetime
4041
from typing import Dict, Iterable, Mapping, Optional, Any, TYPE_CHECKING, Tuple
@@ -120,6 +121,19 @@ def __repr__(self) -> str:
120121
return str(self)
121122

122123

124+
def _thread_local_session() -> "boto3.session.Session":
125+
KEY = "torchx_aws_batch_session"
126+
local = threading.local()
127+
if hasattr(local, KEY):
128+
# pyre-ignore[16]
129+
return getattr(local, KEY)
130+
import boto3.session
131+
132+
session = boto3.session.Session()
133+
setattr(local, KEY, session)
134+
return session
135+
136+
123137
class AWSBatchScheduler(Scheduler, DockerWorkspace):
124138
"""
125139
AWSBatchScheduler is a TorchX scheduling interface to AWS Batch.
@@ -174,20 +188,16 @@ def __init__(
174188
@property
175189
# pyre-fixme[3]: Return annotation cannot be `Any`.
176190
def _client(self) -> Any:
177-
if self.__client is None:
178-
import boto3
179-
180-
self.__client = boto3.client("batch")
181-
return self.__client
191+
if self.__client:
192+
return self.__client
193+
return _thread_local_session().client("batch")
182194

183195
@property
184196
# pyre-fixme[3]: Return annotation cannot be `Any`.
185197
def _log_client(self) -> Any:
186-
if self.__log_client is None:
187-
import boto3
188-
189-
self.__log_client = boto3.client("logs")
190-
return self.__log_client
198+
if self.__log_client:
199+
return self.__log_client
200+
return _thread_local_session().client("logs")
191201

192202
def schedule(self, dryrun_info: AppDryRunInfo[BatchJob]) -> str:
193203
cfg = dryrun_info._cfg

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:
291291
"tasks": tasks,
292292
"maxRetry": job_retries,
293293
"plugins": {
294-
"svc": [],
294+
# https://github.com/volcano-sh/volcano/issues/533
295+
"svc": ["--publish-not-ready-addresses"],
295296
"env": [],
296297
},
297298
},

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ def test_submit_dryrun(self) -> None:
193193
maxRetry: 3
194194
plugins:
195195
env: []
196-
svc: []
196+
svc:
197+
- --publish-not-ready-addresses
197198
queue: testqueue
198199
schedulerName: volcano
199200
tasks:

0 commit comments

Comments
 (0)