@@ -30,31 +30,41 @@ import scala.sys.process._
30
30
import org .json4s ._
31
31
import org .json4s .jackson .JsonMethods
32
32
33
- import org .apache .spark .{Logging , SparkContext }
34
- import org .apache .spark .deploy .master .RecoveryState
33
+ import org .apache .spark .{Logging , SparkConf , SparkContext }
34
+ import org .apache .spark .deploy .master .{ RecoveryState , SparkCuratorUtil }
35
35
36
36
/**
37
37
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
38
38
* In order to mimic a real distributed cluster more closely, Docker is used.
39
39
* Execute using
40
- * ./spark-class org.apache.spark.deploy.FaultToleranceTest
40
+ * ./bin/ spark-class org.apache.spark.deploy.FaultToleranceTest
41
41
*
42
- * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
42
+ * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
43
+ * *and* SPARK_JAVA_OPTS:
43
44
* - spark.deploy.recoveryMode=ZOOKEEPER
44
45
* - spark.deploy.zookeeper.url=172.17.42.1:2181
45
46
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
46
47
*
48
+ * In case of failure, make sure to kill off prior docker containers before restarting:
49
+ * docker kill $(docker ps -q)
50
+ *
47
51
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
48
52
* working installation of Docker. In addition to having Docker, the following are assumed:
49
53
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
50
54
* - The docker images tagged spark-test-master and spark-test-worker are built from the
51
55
* docker/ directory. Run 'docker/spark-test/build' to generate these.
52
56
*/
53
57
private [spark] object FaultToleranceTest extends App with Logging {
58
+
59
+ val conf = new SparkConf ()
60
+ val ZK_DIR = conf.get(" spark.deploy.zookeeper.dir" , " /spark" )
61
+
54
62
val masters = ListBuffer [TestMasterInfo ]()
55
63
val workers = ListBuffer [TestWorkerInfo ]()
56
64
var sc : SparkContext = _
57
65
66
+ val zk = SparkCuratorUtil .newClient(conf)
67
+
58
68
var numPassed = 0
59
69
var numFailed = 0
60
70
@@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
72
82
sc = null
73
83
}
74
84
terminateCluster()
85
+
86
+ // Clear ZK directories in between tests (for speed purposes)
87
+ SparkCuratorUtil .deleteRecursive(zk, ZK_DIR + " /spark_leader" )
88
+ SparkCuratorUtil .deleteRecursive(zk, ZK_DIR + " /master_status" )
75
89
}
76
90
77
91
test(" sanity-basic" ) {
@@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
168
182
try {
169
183
fn
170
184
numPassed += 1
185
+ logInfo(" ==============================================" )
171
186
logInfo(" Passed: " + name)
187
+ logInfo(" ==============================================" )
172
188
} catch {
173
189
case e : Exception =>
174
190
numFailed += 1
191
+ logInfo(" !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" )
175
192
logError(" FAILED: " + name, e)
193
+ logInfo(" !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" )
194
+ sys.exit(1 )
176
195
}
177
196
afterEach()
178
197
}
179
198
180
199
def addMasters (num : Int ) {
200
+ logInfo(s " >>>>> ADD MASTERS $num <<<<< " )
181
201
(1 to num).foreach { _ => masters += SparkDocker .startMaster(dockerMountDir) }
182
202
}
183
203
184
204
def addWorkers (num : Int ) {
205
+ logInfo(s " >>>>> ADD WORKERS $num <<<<< " )
185
206
val masterUrls = getMasterUrls(masters)
186
207
(1 to num).foreach { _ => workers += SparkDocker .startWorker(dockerMountDir, masterUrls) }
187
208
}
188
209
189
210
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
190
211
def createClient () = {
212
+ logInfo(" >>>>> CREATE CLIENT <<<<<" )
191
213
if (sc != null ) { sc.stop() }
192
214
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
193
215
// property, we need to reset it.
@@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
206
228
}
207
229
208
230
def killLeader (): Unit = {
231
+ logInfo(" >>>>> KILL LEADER <<<<<" )
209
232
masters.foreach(_.readState())
210
233
val leader = getLeader
211
234
masters -= leader
@@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
215
238
def delay (secs : Duration = 5 .seconds) = Thread .sleep(secs.toMillis)
216
239
217
240
def terminateCluster () {
241
+ logInfo(" >>>>> TERMINATE CLUSTER <<<<<" )
218
242
masters.foreach(_.kill())
219
243
workers.foreach(_.kill())
220
244
masters.clear()
@@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
245
269
* are all alive in a proper configuration (e.g., only one leader).
246
270
*/
247
271
def assertValidClusterState () = {
272
+ logInfo(" >>>>> ASSERT VALID CLUSTER STATE <<<<<" )
248
273
assertUsable()
249
274
var numAlive = 0
250
275
var numStandby = 0
@@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
326
351
327
352
val workers = json \ " workers"
328
353
val liveWorkers = workers.children.filter(w => (w \ " state" ).extract[String ] == " ALIVE" )
329
- liveWorkerIPs = liveWorkers.map(w => (w \ " host" ).extract[String ])
354
+ // Extract the worker IP from "webuiaddress" (rather than "host") because the host name
355
+ // on containers is a weird hash instead of the actual IP address.
356
+ liveWorkerIPs = liveWorkers.map {
357
+ w => (w \ " webuiaddress" ).extract[String ].stripPrefix(" http://" ).stripSuffix(" :8081" )
358
+ }
330
359
331
360
numLiveApps = (json \ " activeapps" ).children.size
332
361
@@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
403
432
def makeRunCmd (imageTag : String , args : String = " " , mountDir : String = " " ): ProcessBuilder = {
404
433
val mountCmd = if (mountDir != " " ) { " -v " + mountDir } else " "
405
434
406
- val cmd = " docker run %s %s %s" .format(mountCmd, imageTag, args)
435
+ val cmd = " docker run -privileged %s %s %s" .format(mountCmd, imageTag, args)
407
436
logDebug(" Run command: " + cmd)
408
437
cmd
409
438
}
0 commit comments