Skip to content

Commit 57ee6f0

Browse files
committed
SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
Refactored Leader Election agent and added a RecoveryModeFactory. Implemented new proposal with some convenient modifications. Added a read method.
1 parent 293a0b5 commit 57ee6f0

10 files changed

+211
-153
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import scala.collection.mutable.ArrayBuffer
2424

2525
import akka.actor.ActorRef
2626

27+
import org.apache.spark.annotation.DeveloperApi
2728
import org.apache.spark.deploy.ApplicationDescription
2829

29-
private[spark] class ApplicationInfo(
30+
@DeveloperApi
31+
class ApplicationInfo(
3032
val startTime: Long,
3133
val id: String,
3234
val desc: ApplicationDescription,

core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package org.apache.spark.deploy.master
1919

2020
import java.util.Date
2121

22+
import org.apache.spark.annotation.DeveloperApi
2223
import org.apache.spark.deploy.DriverDescription
2324

24-
private[spark] class DriverInfo(
25+
@DeveloperApi
26+
class DriverInfo(
2527
val startTime: Long,
2628
val id: String,
2729
val desc: DriverDescription,

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark.deploy.master
1919

2020
import java.io._
21-
22-
import akka.serialization.Serialization
21+
import java.nio.ByteBuffer
2322

2423
import org.apache.spark.Logging
24+
import org.apache.spark.serializer.Serializer
25+
26+
import scala.reflect.ClassTag
2527

2628
/**
2729
* Stores data in a single on-disk directory with one file per application and worker.
@@ -32,65 +34,39 @@ import org.apache.spark.Logging
3234
*/
3335
private[spark] class FileSystemPersistenceEngine(
3436
val dir: String,
35-
val serialization: Serialization)
37+
val serialization: Serializer)
3638
extends PersistenceEngine with Logging {
3739

40+
val serializer = serialization.newInstance()
3841
new File(dir).mkdir()
3942

40-
override def addApplication(app: ApplicationInfo) {
41-
val appFile = new File(dir + File.separator + "app_" + app.id)
42-
serializeIntoFile(appFile, app)
43-
}
44-
45-
override def removeApplication(app: ApplicationInfo) {
46-
new File(dir + File.separator + "app_" + app.id).delete()
47-
}
48-
49-
override def addDriver(driver: DriverInfo) {
50-
val driverFile = new File(dir + File.separator + "driver_" + driver.id)
51-
serializeIntoFile(driverFile, driver)
43+
override def persist(name: String, obj: Object): Unit = {
44+
serializeIntoFile(new File(dir + File.separator + name), obj)
5245
}
5346

54-
override def removeDriver(driver: DriverInfo) {
55-
new File(dir + File.separator + "driver_" + driver.id).delete()
47+
override def unpersist(name: String): Unit = {
48+
new File(dir + File.separator + name).delete()
5649
}
5750

58-
override def addWorker(worker: WorkerInfo) {
59-
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
60-
serializeIntoFile(workerFile, worker)
61-
}
62-
63-
override def removeWorker(worker: WorkerInfo) {
64-
new File(dir + File.separator + "worker_" + worker.id).delete()
65-
}
66-
67-
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
68-
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
69-
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
70-
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
71-
val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
72-
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
73-
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
74-
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
75-
(apps, drivers, workers)
51+
override def read[T: ClassTag](name: String) = {
52+
val files = new File(dir).listFiles().filter(_.getName.startsWith(name))
53+
files.map(deserializeFromFile[T])
7654
}
7755

7856
private def serializeIntoFile(file: File, value: AnyRef) {
7957
val created = file.createNewFile()
8058
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
8159

82-
val serializer = serialization.findSerializerFor(value)
83-
val serialized = serializer.toBinary(value)
84-
85-
val out = new FileOutputStream(file)
60+
val out = serializer.serializeStream(new FileOutputStream(file))
8661
try {
87-
out.write(serialized)
62+
out.writeObject(value)
8863
} finally {
8964
out.close()
9065
}
66+
9167
}
9268

93-
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
69+
def deserializeFromFile[T](file: File): T = {
9470
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
9571
val dis = new DataInputStream(new FileInputStream(file))
9672
try {
@@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
9975
dis.close()
10076
}
10177

102-
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
103-
val serializer = serialization.serializerFor(clazz)
104-
serializer.fromBinary(fileData).asInstanceOf[T]
78+
serializer.deserializeStream(dis).readObject()
10579
}
10680
}

core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,27 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import akka.actor.{Actor, ActorRef}
21-
22-
import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
20+
import org.apache.spark.annotation.DeveloperApi
2321

2422
/**
25-
* A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
26-
* is the only Master serving requests.
27-
* In addition to the API provided, the LeaderElectionAgent will use of the following messages
28-
* to inform the Master of leader changes:
29-
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
30-
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
23+
* :: DeveloperApi ::
24+
*
25+
* A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
3126
*/
32-
private[spark] trait LeaderElectionAgent extends Actor {
33-
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
34-
val masterActor: ActorRef
27+
@DeveloperApi
28+
trait LeaderElectionAgent {
29+
val masterActor: LeaderElectable
30+
def stop() {} // to avoid noops in implementations.
3531
}
3632

37-
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
38-
private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
39-
override def preStart() {
40-
masterActor ! ElectedLeader
41-
}
33+
@DeveloperApi
34+
trait LeaderElectable {
35+
def electedLeader()
36+
def revokedLeadership()
37+
}
4238

43-
override def receive = {
44-
case _ =>
45-
}
39+
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
40+
private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
41+
extends LeaderElectionAgent {
42+
masterActor.electedLeader()
4643
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] class Master(
5050
port: Int,
5151
webUiPort: Int,
5252
val securityMgr: SecurityManager)
53-
extends Actor with ActorLogReceive with Logging {
53+
extends Actor with ActorLogReceive with Logging with LeaderElectable {
5454

5555
import context.dispatcher // to use Akka's scheduler.schedule()
5656

@@ -61,7 +61,6 @@ private[spark] class Master(
6161
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
6262
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
6363
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
64-
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
6564
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
6665

6766
val workers = new HashSet[WorkerInfo]
@@ -103,7 +102,7 @@ private[spark] class Master(
103102

104103
var persistenceEngine: PersistenceEngine = _
105104

106-
var leaderElectionAgent: ActorRef = _
105+
var leaderElectionAgent: LeaderElectionAgent = _
107106

108107
private var recoveryCompletionTask: Cancellable = _
109108

@@ -130,23 +129,24 @@ private[spark] class Master(
130129
masterMetricsSystem.start()
131130
applicationMetricsSystem.start()
132131

133-
persistenceEngine = RECOVERY_MODE match {
132+
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
134133
case "ZOOKEEPER" =>
135134
logInfo("Persisting recovery state to ZooKeeper")
136-
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
135+
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
136+
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
137137
case "FILESYSTEM" =>
138-
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
139-
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
138+
val fsFactory = new FileSystemRecoveryModeFactory(conf)
139+
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
140+
case "CUSTOM" =>
141+
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
142+
val factory = clazz.getConstructor(conf.getClass)
143+
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
144+
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
140145
case _ =>
141-
new BlackHolePersistenceEngine()
146+
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
142147
}
143-
144-
leaderElectionAgent = RECOVERY_MODE match {
145-
case "ZOOKEEPER" =>
146-
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
147-
case _ =>
148-
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
149-
}
148+
persistenceEngine = persistenceEngine_
149+
leaderElectionAgent = leaderElectionAgent_
150150
}
151151

152152
override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -165,7 +165,15 @@ private[spark] class Master(
165165
masterMetricsSystem.stop()
166166
applicationMetricsSystem.stop()
167167
persistenceEngine.close()
168-
context.stop(leaderElectionAgent)
168+
leaderElectionAgent.stop()
169+
}
170+
171+
override def electedLeader() {
172+
self ! ElectedLeader
173+
}
174+
175+
override def revokedLeadership() {
176+
self ! RevokedLeadership
169177
}
170178

171179
override def receiveWithLogging = {
@@ -732,8 +740,7 @@ private[spark] class Master(
732740
msg = URLEncoder.encode(msg, "UTF-8")
733741
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
734742
false
735-
}
736-
}
743+
} }
737744

738745
/** Generate a new app ID given a app's submission date */
739746
def newApplicationId(submitDate: Date): String = {

core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark.deploy.master
1919

20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
import scala.reflect.ClassTag
23+
2024
/**
2125
* Allows Master to persist any state that is necessary in order to recover from a failure.
2226
* The following semantics are required:
@@ -26,35 +30,58 @@ package org.apache.spark.deploy.master
2630
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
2731
* during recovery).
2832
*/
29-
private[spark] trait PersistenceEngine {
30-
def addApplication(app: ApplicationInfo)
33+
@DeveloperApi
34+
trait PersistenceEngine {
35+
36+
def persist(name: String, obj: Object)
37+
38+
def unpersist(name: String)
39+
40+
def read[T: ClassTag](name: String): Seq[T]
3141

32-
def removeApplication(app: ApplicationInfo)
42+
def addApplication(app: ApplicationInfo): Unit = {
43+
persist("app_" + app.id, app)
44+
}
3345

34-
def addWorker(worker: WorkerInfo)
46+
def removeApplication(app: ApplicationInfo): Unit = {
47+
unpersist("app_" + app.id)
48+
}
3549

36-
def removeWorker(worker: WorkerInfo)
50+
def addWorker(worker: WorkerInfo): Unit = {
51+
persist("worker_" + worker.id, worker)
52+
}
3753

38-
def addDriver(driver: DriverInfo)
54+
def removeWorker(worker: WorkerInfo): Unit = {
55+
unpersist("worker_" + worker.id)
56+
}
3957

40-
def removeDriver(driver: DriverInfo)
58+
def addDriver(driver: DriverInfo): Unit = {
59+
persist("driver_" + driver.id, driver)
60+
}
61+
62+
def removeDriver(driver: DriverInfo): Unit = {
63+
unpersist("driver_" + driver.id)
64+
}
4165

4266
/**
4367
* Returns the persisted data sorted by their respective ids (which implies that they're
4468
* sorted by time of creation).
4569
*/
46-
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
70+
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
71+
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
72+
}
4773

4874
def close() {}
4975
}
5076

5177
private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
52-
override def addApplication(app: ApplicationInfo) {}
53-
override def removeApplication(app: ApplicationInfo) {}
54-
override def addWorker(worker: WorkerInfo) {}
55-
override def removeWorker(worker: WorkerInfo) {}
56-
override def addDriver(driver: DriverInfo) {}
57-
override def removeDriver(driver: DriverInfo) {}
78+
79+
override def persist(name: String, obj: Object): Unit = {}
5880

5981
override def readPersistedData() = (Nil, Nil, Nil)
82+
83+
override def unpersist(name: String): Unit = {}
84+
85+
override def read[T: ClassTag](name: String): Seq[T] = Nil
86+
6087
}

0 commit comments

Comments
 (0)