Skip to content

Commit b848771

Browse files
Marcelo Vanzinpwendell
authored andcommitted
[SPARK-2778] [yarn] Add yarn integration tests.
This patch adds a couple of, currently, very simple integration tests to make sure both client and cluster modes are working. The tests don't do much yet other than run a simple job, but the plan is to enhance them after we get the framework in. The cluster tests are noisy, so redirect all log output to a file like other tests do. Copying the conf around sucks but it's less work than messing with maven/sbt and having to clean up other projects. Note the test is only added for yarn-stable. The code compiles against yarn-alpha but there are two issues I ran into that I could not overcome: - an old netty dependency kept creeping into the classpath and causing akka to not work, when using sbt; the old netty was correctly suppressed under maven. - MiniYARNCluster kept failing to execute containers because it did not create the NM's local dir itself; this is apparently a known behavior, but I'm not sure how to work around it. None of those issues are present with the stable Yarn. Also, these tests are a little slow to run. Apparently Spark doesn't yet tag tests (so that these could be isolated in a "slow" batch), so this is something to keep in mind. Author: Marcelo Vanzin <[email protected]> Closes apache#2257 from vanzin/yarn-tests and squashes the following commits: 6d5b84e [Marcelo Vanzin] Fix wrong system property being set. 8b0933d [Marcelo Vanzin] Merge branch 'master' into yarn-tests 5c2b56f [Marcelo Vanzin] Use custom log4j conf for Yarn containers. ec73f17 [Marcelo Vanzin] More review feedback. 67f5b02 [Marcelo Vanzin] Review feedback. f01517c [Marcelo Vanzin] Review feedback. 68fbbbf [Marcelo Vanzin] Use older constructor available in older Hadoop releases. d07ef9a [Marcelo Vanzin] Merge branch 'master' into yarn-tests add8416 [Marcelo Vanzin] [SPARK-2778] [yarn] Add yarn integration tests.
1 parent 8ca4ecb commit b848771

File tree

8 files changed

+229
-10
lines changed

8 files changed

+229
-10
lines changed

pom.xml

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,35 @@
712712
</exclusion>
713713
</exclusions>
714714
</dependency>
715+
<dependency>
716+
<groupId>org.apache.hadoop</groupId>
717+
<artifactId>hadoop-yarn-server-tests</artifactId>
718+
<version>${yarn.version}</version>
719+
<classifier>tests</classifier>
720+
<scope>test</scope>
721+
<exclusions>
722+
<exclusion>
723+
<groupId>asm</groupId>
724+
<artifactId>asm</artifactId>
725+
</exclusion>
726+
<exclusion>
727+
<groupId>org.ow2.asm</groupId>
728+
<artifactId>asm</artifactId>
729+
</exclusion>
730+
<exclusion>
731+
<groupId>org.jboss.netty</groupId>
732+
<artifactId>netty</artifactId>
733+
</exclusion>
734+
<exclusion>
735+
<groupId>javax.servlet</groupId>
736+
<artifactId>servlet-api</artifactId>
737+
</exclusion>
738+
<exclusion>
739+
<groupId>commons-logging</groupId>
740+
<artifactId>commons-logging</artifactId>
741+
</exclusion>
742+
</exclusions>
743+
</dependency>
715744
<dependency>
716745
<groupId>org.apache.hadoop</groupId>
717746
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
@@ -1187,7 +1216,7 @@
11871216
<dependency>
11881217
<groupId>org.apache.zookeeper</groupId>
11891218
<artifactId>zookeeper</artifactId>
1190-
<version>3.4.5-mapr-1406</version>
1219+
<version>3.4.5-mapr-1406</version>
11911220
</dependency>
11921221
</dependencies>
11931222
</profile>

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -401,17 +401,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
401401
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
402402
status = FinalApplicationStatus.SUCCEEDED
403403
} catch {
404-
case e: InvocationTargetException => {
404+
case e: InvocationTargetException =>
405405
e.getCause match {
406-
case _: InterruptedException => {
406+
case _: InterruptedException =>
407407
// Reporter thread can interrupt to stop user class
408-
}
408+
409+
case e => throw e
409410
}
410-
}
411411
} finally {
412412
logDebug("Finishing main")
413+
finalStatus = status
413414
}
414-
finalStatus = status
415415
}
416416
}
417417
userClassThread.setName("Driver")

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private[spark] trait ClientBase extends Logging {
348348
}
349349

350350
// For log4j configuration to reference
351-
javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
351+
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
352352

353353
val userClass =
354354
if (args.userClass != null) {

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ trait ExecutorRunnableUtil extends Logging {
9898
*/
9999

100100
// For log4j configuration to reference
101-
javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
101+
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
102102

103103
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
104104
"-server",

yarn/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,14 @@
140140
<configuration>
141141
<environmentVariables>
142142
<SPARK_HOME>${basedir}/../..</SPARK_HOME>
143-
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
144143
</environmentVariables>
145144
</configuration>
146145
</plugin>
147146
</plugins>
148147

149148
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
150149
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
151-
150+
152151
<resources>
153152
<resource>
154153
<directory>../common/src/main/resources</directory>

yarn/stable/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,13 @@
3232
<packaging>jar</packaging>
3333
<name>Spark Project YARN Stable API</name>
3434

35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.hadoop</groupId>
38+
<artifactId>hadoop-yarn-server-tests</artifactId>
39+
<classifier>tests</classifier>
40+
<scope>test</scope>
41+
</dependency>
42+
</dependencies>
43+
3544
</project>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Set everything to be logged to the file core/target/unit-tests.log
19+
log4j.rootCategory=INFO, file
20+
log4j.appender.file=org.apache.log4j.FileAppender
21+
log4j.appender.file.append=false
22+
log4j.appender.file.file=target/unit-tests.log
23+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
24+
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
25+
26+
# Ignore messages below warning level from Jetty, because it's a bit verbose
27+
log4j.logger.org.eclipse.jetty=WARN
28+
org.eclipse.jetty.LEVEL=WARN
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import java.io.File
21+
22+
import scala.collection.JavaConversions._
23+
24+
import com.google.common.base.Charsets
25+
import com.google.common.io.Files
26+
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
27+
28+
import org.apache.hadoop.yarn.conf.YarnConfiguration
29+
import org.apache.hadoop.yarn.server.MiniYARNCluster
30+
31+
import org.apache.spark.{Logging, SparkConf, SparkContext}
32+
import org.apache.spark.deploy.SparkHadoopUtil
33+
import org.apache.spark.util.Utils
34+
35+
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
36+
37+
// log4j configuration for the Yarn containers, so that their output is collected
38+
// by Yarn instead of trying to overwrite unit-tests.log.
39+
private val LOG4J_CONF = """
40+
|log4j.rootCategory=DEBUG, console
41+
|log4j.appender.console=org.apache.log4j.ConsoleAppender
42+
|log4j.appender.console.target=System.err
43+
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
44+
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
45+
""".stripMargin
46+
47+
private var yarnCluster: MiniYARNCluster = _
48+
private var tempDir: File = _
49+
private var fakeSparkJar: File = _
50+
private var oldConf: Map[String, String] = _
51+
52+
override def beforeAll() {
53+
tempDir = Utils.createTempDir()
54+
55+
val logConfDir = new File(tempDir, "log4j")
56+
logConfDir.mkdir()
57+
58+
val logConfFile = new File(logConfDir, "log4j.properties")
59+
Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
60+
61+
val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
62+
sys.props("java.class.path")
63+
64+
oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
65+
66+
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
67+
yarnCluster.init(new YarnConfiguration())
68+
yarnCluster.start()
69+
yarnCluster.getConfig().foreach { e =>
70+
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
71+
}
72+
73+
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
74+
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
75+
sys.props += ("spark.executor.instances" -> "1")
76+
sys.props += ("spark.driver.extraClassPath" -> childClasspath)
77+
sys.props += ("spark.executor.extraClassPath" -> childClasspath)
78+
79+
super.beforeAll()
80+
}
81+
82+
override def afterAll() {
83+
yarnCluster.stop()
84+
sys.props.retain { case (k, v) => !k.startsWith("spark.") }
85+
sys.props ++= oldConf
86+
super.afterAll()
87+
}
88+
89+
test("run Spark in yarn-client mode") {
90+
var result = File.createTempFile("result", null, tempDir)
91+
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
92+
checkResult(result)
93+
}
94+
95+
test("run Spark in yarn-cluster mode") {
96+
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
97+
var result = File.createTempFile("result", null, tempDir)
98+
99+
// The Client object will call System.exit() after the job is done, and we don't want
100+
// that because it messes up the scalatest monitoring. So replicate some of what main()
101+
// does here.
102+
val args = Array("--class", main,
103+
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
104+
"--arg", "yarn-cluster",
105+
"--arg", result.getAbsolutePath(),
106+
"--num-executors", "1")
107+
val sparkConf = new SparkConf()
108+
val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
109+
val clientArgs = new ClientArguments(args, sparkConf)
110+
new Client(clientArgs, yarnConf, sparkConf).run()
111+
checkResult(result)
112+
}
113+
114+
/**
115+
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
116+
* any sort of error when the job process finishes successfully, but the job itself fails. So
117+
* the tests enforce that something is written to a file after everything is ok to indicate
118+
* that the job succeeded.
119+
*/
120+
private def checkResult(result: File) = {
121+
var resultString = Files.toString(result, Charsets.UTF_8)
122+
resultString should be ("success")
123+
}
124+
125+
}
126+
127+
private object YarnClusterDriver extends Logging with Matchers {
128+
129+
def main(args: Array[String]) = {
130+
if (args.length != 2) {
131+
System.err.println(
132+
s"""
133+
|Invalid command line: ${args.mkString(" ")}
134+
|
135+
|Usage: YarnClusterDriver [master] [result file]
136+
""".stripMargin)
137+
System.exit(1)
138+
}
139+
140+
val sc = new SparkContext(new SparkConf().setMaster(args(0))
141+
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
142+
val status = new File(args(1))
143+
var result = "failure"
144+
try {
145+
val data = sc.parallelize(1 to 4, 4).collect().toSet
146+
data should be (Set(1, 2, 3, 4))
147+
result = "success"
148+
} finally {
149+
sc.stop()
150+
Files.write(result, status, Charsets.UTF_8)
151+
}
152+
}
153+
154+
}

0 commit comments

Comments
 (0)