Skip to content

Commit 29880da

Browse files
committed
[SPARK-4377] Fixed serialization issue by switching to akka provided serializer - there is no way around this for deserializing actorRef(s).
1 parent 423baea commit 29880da

File tree

4 files changed

+40
-35
lines changed

4 files changed

+40
-35
lines changed

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark.deploy.master
1919

2020
import java.io._
21-
import java.nio.ByteBuffer
21+
22+
import scala.reflect.ClassTag
23+
24+
import akka.serialization.Serialization
2225

2326
import org.apache.spark.Logging
24-
import org.apache.spark.serializer.Serializer
2527

26-
import scala.reflect.ClassTag
2728

2829
/**
2930
* Stores data in a single on-disk directory with one file per application and worker.
@@ -34,10 +35,9 @@ import scala.reflect.ClassTag
3435
*/
3536
private[spark] class FileSystemPersistenceEngine(
3637
val dir: String,
37-
val serialization: Serializer)
38+
val serialization: Serialization)
3839
extends PersistenceEngine with Logging {
3940

40-
val serializer = serialization.newInstance()
4141
new File(dir).mkdir()
4242

4343
override def persist(name: String, obj: Object): Unit = {
@@ -56,25 +56,27 @@ private[spark] class FileSystemPersistenceEngine(
5656
private def serializeIntoFile(file: File, value: AnyRef) {
5757
val created = file.createNewFile()
5858
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
59-
60-
val out = serializer.serializeStream(new FileOutputStream(file))
59+
val serializer = serialization.findSerializerFor(value)
60+
val serialized = serializer.toBinary(value)
61+
val out = new FileOutputStream(file)
6162
try {
62-
out.writeObject(value)
63+
out.write(serialized)
6364
} finally {
6465
out.close()
6566
}
66-
6767
}
6868

69-
def deserializeFromFile[T](file: File): T = {
69+
private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
7070
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
7171
val dis = new DataInputStream(new FileInputStream(file))
7272
try {
7373
dis.readFully(fileData)
7474
} finally {
7575
dis.close()
7676
}
77-
78-
serializer.deserializeStream(dis).readObject()
77+
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
78+
val serializer = serialization.serializerFor(clazz)
79+
serializer.fromBinary(fileData).asInstanceOf[T]
7980
}
81+
8082
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import scala.util.Random
3030
import akka.actor._
3131
import akka.pattern.ask
3232
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
33+
import akka.serialization.Serialization
3334
import akka.serialization.SerializationExtension
3435

3536
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -132,15 +133,16 @@ private[spark] class Master(
132133
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
133134
case "ZOOKEEPER" =>
134135
logInfo("Persisting recovery state to ZooKeeper")
135-
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
136+
val zkFactory = new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
136137
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
137138
case "FILESYSTEM" =>
138-
val fsFactory = new FileSystemRecoveryModeFactory(conf)
139+
val fsFactory = new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
139140
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
140141
case "CUSTOM" =>
141142
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
142-
val factory = clazz.getConstructor(conf.getClass)
143-
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
143+
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
144+
.newInstance(conf, SerializationExtension(context.system))
145+
.asInstanceOf[StandaloneRecoveryModeFactory]
144146
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
145147
case _ =>
146148
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))

core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark.deploy.master
1919

20+
import akka.serialization.Serialization
21+
2022
import org.apache.spark.{Logging, SparkConf}
2123
import org.apache.spark.annotation.DeveloperApi
22-
import org.apache.spark.serializer.JavaSerializer
2324

2425
/**
2526
* ::DeveloperApi::
@@ -29,7 +30,7 @@ import org.apache.spark.serializer.JavaSerializer
2930
*
3031
*/
3132
@DeveloperApi
32-
abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
33+
abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) {
3334

3435
/**
3536
* PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
@@ -48,21 +49,21 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
4849
* LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
4950
* recovery is made by restoring from filesystem.
5051
*/
51-
private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
52-
extends StandaloneRecoveryModeFactory(conf) with Logging {
52+
private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf, s: Serialization)
53+
extends StandaloneRecoveryModeFactory(conf, s) with Logging {
5354
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
5455

5556
def createPersistenceEngine() = {
5657
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
57-
new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
58+
new FileSystemPersistenceEngine(RECOVERY_DIR, s)
5859
}
5960

6061
def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master)
6162
}
6263

63-
private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
64-
extends StandaloneRecoveryModeFactory(conf) {
65-
def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf)
64+
private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf, s: Serialization)
65+
extends StandaloneRecoveryModeFactory(conf, s) {
66+
def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, s)
6667

6768
def createLeaderElectionAgent(master: LeaderElectable) =
6869
new ZooKeeperLeaderElectionAgent(master, conf)

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,24 @@
1717

1818
package org.apache.spark.deploy.master
1919

20+
import akka.serialization.Serialization
21+
2022
import scala.collection.JavaConversions._
23+
import scala.reflect.ClassTag
2124

2225
import org.apache.curator.framework.CuratorFramework
2326
import org.apache.zookeeper.CreateMode
2427

2528
import org.apache.spark.{Logging, SparkConf}
26-
import org.apache.spark.serializer.Serializer
27-
import java.nio.ByteBuffer
2829

29-
import scala.reflect.ClassTag
3030

31-
32-
private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf)
31+
private[spark] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
3332
extends PersistenceEngine
3433
with Logging
3534
{
3635
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
3736
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
3837

39-
val serializer = serialization.newInstance()
40-
4138
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
4239

4340

@@ -59,14 +56,17 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c
5956
}
6057

6158
private def serializeIntoFile(path: String, value: AnyRef) {
62-
val serialized = serializer.serialize(value)
63-
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array())
59+
val serializer = serialization.findSerializerFor(value)
60+
val serialized = serializer.toBinary(value)
61+
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
6462
}
6563

66-
def deserializeFromFile[T](filename: String): Option[T] = {
64+
def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
6765
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
66+
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
67+
val serializer = serialization.serializerFor(clazz)
6868
try {
69-
Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
69+
Some(serializer.fromBinary(fileData).asInstanceOf[T])
7070
} catch {
7171
case e: Exception => {
7272
logWarning("Exception while reading persisted file, deleting", e)

0 commit comments

Comments
 (0)