Skip to content

Commit 54c0e82

Browse files
authored
[SPARK-501] Support CNI network labels: added integration test (apache#167)
1 parent 6ec5bb9 commit 54c0e82

File tree

3 files changed

+53
-4
lines changed

3 files changed

+53
-4
lines changed

tests/test_recovery.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ def test_disconnect_from_master():
3535
"--conf", "spark.cores.max=1"])
3636

3737
# Wait until executor is running
38-
LOGGER.info("Waiting for executor task to be RUNNING...")
39-
shakedown.wait_for(lambda: utils.is_service_ready(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS),
40-
ignore_exceptions=False,
41-
timeout_seconds=600)
38+
utils.wait_for_executors_running(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS)
4239

4340
# Block the driver's connection to Mesos master
4441
framework_info = shakedown.get_service(LONG_RUNNING_FW_NAME)

tests/test_spark.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
LOGGER = logging.getLogger(__name__)
2222
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
23+
SPARK_PI_FW_NAME = "Spark Pi"
24+
CNI_TEST_NUM_EXECUTORS = 1
2325

2426

2527
def setup_module(module):
@@ -113,6 +115,49 @@ def test_cni():
113115
"--class", "org.apache.spark.examples.SparkPi"])
114116

115117

118+
@pytest.mark.skip("Enable when SPARK-21694 is merged and released in DC/OS Spark")
119+
@pytest.mark.sanity
120+
def test_cni_labels():
121+
SPARK_EXAMPLES="http://downloads.mesosphere.com/spark/assets/spark-examples_2.11-2.0.1.jar"
122+
driver_task_id = utils.submit_job(SPARK_EXAMPLES,
123+
"3000", # Long enough to examine the Driver's & Executor's task infos
124+
["--conf", "spark.mesos.network.name=dcos",
125+
"--conf", "spark.mesos.network.labels=key1:val1,key2:val2",
126+
"--conf", "spark.cores.max={}".format(CNI_TEST_NUM_EXECUTORS),
127+
"--class", "org.apache.spark.examples.SparkPi"])
128+
129+
# Wait until executors are running
130+
utils.wait_for_executors_running(SPARK_PI_FW_NAME, CNI_TEST_NUM_EXECUTORS)
131+
132+
# Check for network name / labels in Driver task info
133+
driver_task = shakedown.get_task(driver_task_id, completed=False)
134+
_check_task_network_info(driver_task)
135+
136+
# Check for network name / labels in Executor task info
137+
executor_task = shakedown.get_service_tasks(SPARK_PI_FW_NAME)[0]
138+
_check_task_network_info(executor_task)
139+
140+
# Check job output
141+
utils.check_job_output(driver_task_id, "Pi is roughly 3")
142+
143+
144+
def _check_task_network_info(task):
145+
# Expected: "network_infos":[{
146+
# "name":"dcos",
147+
# "labels":{
148+
# "labels":[
149+
# {"key":"key1","value":"val1"},
150+
# {"key":"key2","value":"val2"}]}}]
151+
network_info = task['container']['network_infos'][0]
152+
assert network_info['name'] == "dcos"
153+
labels = network_info['labels']['labels']
154+
assert len(labels) == 2
155+
assert labels[0]['key'] == "key1"
156+
assert labels[0]['value'] == "val1"
157+
assert labels[1]['key'] == "key2"
158+
assert labels[1]['value'] == "val2"
159+
160+
116161
@pytest.mark.sanity
117162
def test_s3():
118163
linecount_path = os.path.join(THIS_DIR, 'resources', 'linecount.txt')

tests/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ def submit_job(app_url, app_args, args=[]):
206206
return match.group(1)
207207

208208

209+
def wait_for_executors_running(framework_name, num_executors):
210+
LOGGER.info("Waiting for executor task to be RUNNING...")
211+
shakedown.wait_for(lambda: is_service_ready(framework_name, num_executors),
212+
ignore_exceptions=False,
213+
timeout_seconds=600)
214+
215+
209216
def _task_log(task_id, filename=None):
210217
cmd = "dcos task log --completed --lines=1000 {}".format(task_id) + \
211218
("" if filename is None else " {}".format(filename))

0 commit comments

Comments
 (0)