Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.deploy.master

import java.io._
import java.nio.ByteBuffer

import scala.reflect.ClassTag

import akka.serialization.Serialization

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer

import scala.reflect.ClassTag

/**
* Stores data in a single on-disk directory with one file per application and worker.
Expand All @@ -34,10 +35,9 @@ import scala.reflect.ClassTag
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serializer)
val serialization: Serialization)
extends PersistenceEngine with Logging {

val serializer = serialization.newInstance()
new File(dir).mkdir()

override def persist(name: String, obj: Object): Unit = {
Expand All @@ -56,25 +56,27 @@ private[spark] class FileSystemPersistenceEngine(
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }

val out = serializer.serializeStream(new FileOutputStream(file))
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
try {
out.writeObject(value)
out.write(serialized)
} finally {
out.close()
}

}

def deserializeFromFile[T](file: File): T = {
private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
dis.readFully(fileData)
} finally {
dis.close()
}

serializer.deserializeStream(dis).readObject()
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}

}
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.Random
import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.Serialization
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -132,15 +133,18 @@ private[spark] class Master(
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory = new FileSystemRecoveryModeFactory(conf)
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass)
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.deploy.master

import akka.serialization.Serialization

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.JavaSerializer

/**
* ::DeveloperApi::
Expand All @@ -29,7 +30,7 @@ import org.apache.spark.serializer.JavaSerializer
*
*/
@DeveloperApi
abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) {

/**
* PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
Expand All @@ -48,21 +49,21 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
* LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
* recovery is made by restoring from filesystem.
*/
private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
extends StandaloneRecoveryModeFactory(conf) with Logging {
private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")

def createPersistenceEngine() = {
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
}

def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master)
}

private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
extends StandaloneRecoveryModeFactory(conf) {
def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf)
private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
extends StandaloneRecoveryModeFactory(conf, serializer) {
def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, serializer)

def createLeaderElectionAgent(master: LeaderElectable) =
new ZooKeeperLeaderElectionAgent(master, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,24 @@

package org.apache.spark.deploy.master

import akka.serialization.Serialization

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.serializer.Serializer
import java.nio.ByteBuffer

import scala.reflect.ClassTag


private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf)
private[spark] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
extends PersistenceEngine
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

val serializer = serialization.newInstance()

SparkCuratorUtil.mkdir(zk, WORKING_DIR)


Expand All @@ -59,14 +56,17 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c
}

private def serializeIntoFile(path: String, value: AnyRef) {
val serialized = serializer.serialize(value)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array())
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}

def deserializeFromFile[T](filename: String): Option[T] = {
def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
try {
Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
Some(serializer.fromBinary(fileData).asInstanceOf[T])
} catch {
case e: Exception => {
logWarning("Exception while reading persisted file, deleting", e)
Expand Down