Skip to content

Commit dabeb6f

Browse files
committed
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
This patch allows the FaultToleranceTest to work in newer versions of Docker. See https://spark-project.atlassian.net/browse/SPARK-1136 for more details. Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs. Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed. Author: Aaron Davidson <[email protected]> Closes #5 from aarondav/zookeeper and squashes the following commits: 5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
1 parent 33baf14 commit dabeb6f

File tree

6 files changed

+73
-12
lines changed

6 files changed

+73
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,41 @@ import scala.sys.process._
3030
import org.json4s._
3131
import org.json4s.jackson.JsonMethods
3232

33-
import org.apache.spark.{Logging, SparkContext}
34-
import org.apache.spark.deploy.master.RecoveryState
33+
import org.apache.spark.{Logging, SparkConf, SparkContext}
34+
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
3535

3636
/**
3737
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
3838
* In order to mimic a real distributed cluster more closely, Docker is used.
3939
* Execute using
40-
* ./spark-class org.apache.spark.deploy.FaultToleranceTest
40+
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
4141
*
42-
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
42+
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
43+
* *and* SPARK_JAVA_OPTS:
4344
* - spark.deploy.recoveryMode=ZOOKEEPER
4445
* - spark.deploy.zookeeper.url=172.17.42.1:2181
4546
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
4647
*
48+
* In case of failure, make sure to kill off prior docker containers before restarting:
49+
* docker kill $(docker ps -q)
50+
*
4751
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
4852
* working installation of Docker. In addition to having Docker, the following are assumed:
4953
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
5054
* - The docker images tagged spark-test-master and spark-test-worker are built from the
5155
* docker/ directory. Run 'docker/spark-test/build' to generate these.
5256
*/
5357
private[spark] object FaultToleranceTest extends App with Logging {
58+
59+
val conf = new SparkConf()
60+
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
61+
5462
val masters = ListBuffer[TestMasterInfo]()
5563
val workers = ListBuffer[TestWorkerInfo]()
5664
var sc: SparkContext = _
5765

66+
val zk = SparkCuratorUtil.newClient(conf)
67+
5868
var numPassed = 0
5969
var numFailed = 0
6070

@@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
7282
sc = null
7383
}
7484
terminateCluster()
85+
86+
// Clear ZK directories in between tests (for speed purposes)
87+
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
88+
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
7589
}
7690

7791
test("sanity-basic") {
@@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
168182
try {
169183
fn
170184
numPassed += 1
185+
logInfo("==============================================")
171186
logInfo("Passed: " + name)
187+
logInfo("==============================================")
172188
} catch {
173189
case e: Exception =>
174190
numFailed += 1
191+
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
175192
logError("FAILED: " + name, e)
193+
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
194+
sys.exit(1)
176195
}
177196
afterEach()
178197
}
179198

180199
def addMasters(num: Int) {
200+
logInfo(s">>>>> ADD MASTERS $num <<<<<")
181201
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
182202
}
183203

184204
def addWorkers(num: Int) {
205+
logInfo(s">>>>> ADD WORKERS $num <<<<<")
185206
val masterUrls = getMasterUrls(masters)
186207
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
187208
}
188209

189210
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
190211
def createClient() = {
212+
logInfo(">>>>> CREATE CLIENT <<<<<")
191213
if (sc != null) { sc.stop() }
192214
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
193215
// property, we need to reset it.
@@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
206228
}
207229

208230
def killLeader(): Unit = {
231+
logInfo(">>>>> KILL LEADER <<<<<")
209232
masters.foreach(_.readState())
210233
val leader = getLeader
211234
masters -= leader
@@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
215238
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
216239

217240
def terminateCluster() {
241+
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
218242
masters.foreach(_.kill())
219243
workers.foreach(_.kill())
220244
masters.clear()
@@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
245269
* are all alive in a proper configuration (e.g., only one leader).
246270
*/
247271
def assertValidClusterState() = {
272+
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
248273
assertUsable()
249274
var numAlive = 0
250275
var numStandby = 0
@@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
326351

327352
val workers = json \ "workers"
328353
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
329-
liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
354+
// Extract the worker IP from "webuiaddress" (rather than "host") because the host name
355+
// on containers is a weird hash instead of the actual IP address.
356+
liveWorkerIPs = liveWorkers.map {
357+
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
358+
}
330359

331360
numLiveApps = (json \ "activeapps").children.size
332361

@@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
403432
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
404433
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
405434

406-
val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
435+
val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
407436
logDebug("Run command: " + cmd)
408437
cmd
409438
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
531531

532532
val workerAddress = worker.actor.path.address
533533
if (addressToWorker.contains(workerAddress)) {
534-
logInfo("Attempted to re-register worker at same address: " + workerAddress)
535-
return false
534+
val oldWorker = addressToWorker(workerAddress)
535+
if (oldWorker.state == WorkerState.UNKNOWN) {
536+
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
537+
// The old worker must thus be dead, so we will remove it and accept the new worker.
538+
removeWorker(oldWorker)
539+
} else {
540+
logInfo("Attempted to re-register worker at same address: " + workerAddress)
541+
return false
542+
}
536543
}
537544

538545
workers += worker

core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import org.apache.spark.{SparkConf, Logging}
20+
import scala.collection.JavaConversions._
21+
2122
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
2223
import org.apache.curator.retry.ExponentialBackoffRetry
2324
import org.apache.zookeeper.KeeperException
2425

26+
import org.apache.spark.{Logging, SparkConf}
2527

2628
object SparkCuratorUtil extends Logging {
2729

@@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
5052
}
5153
}
5254
}
55+
56+
def deleteRecursive(zk: CuratorFramework, path: String) {
57+
if (zk.checkExists().forPath(path) != null) {
58+
for (child <- zk.getChildren.forPath(path)) {
59+
zk.delete().forPath(path + "/" + child)
60+
}
61+
zk.delete().forPath(path)
62+
}
63+
}
5364
}

docker/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@ Spark docker files
22
===========
33

44
Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
5-
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
5+
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
6+
7+
Tested with Docker version 0.8.1.

docker/spark-test/master/default_cmd

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
2121
echo "CONTAINER_IP=$IP"
22-
/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
22+
export SPARK_LOCAL_IP=$IP
23+
export SPARK_PUBLIC_DNS=$IP
24+
25+
# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
26+
umount /etc/hosts
27+
28+
/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP

docker/spark-test/worker/default_cmd

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
2121
echo "CONTAINER_IP=$IP"
22-
/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
22+
export SPARK_LOCAL_IP=$IP
23+
export SPARK_PUBLIC_DNS=$IP
24+
25+
# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
26+
umount /etc/hosts
27+
28+
/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1

0 commit comments

Comments
 (0)