Skip to content

Commit 97beaf8

Browse files
committed
SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo
1 parent e3a88d1 commit 97beaf8

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import com.esotericsoftware.kryo.{Kryo, KryoException}
2424
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
2525
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
2626
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
27+
import org.roaringbitmap.{ArrayContainer, RoaringArray, RoaringBitmap}
28+
2729

2830
import org.apache.spark._
2931
import org.apache.spark.api.python.PythonBroadcast
@@ -202,6 +204,12 @@ private[serializer] object KryoSerializer {
202204
classOf[GetBlock],
203205
classOf[CompressedMapStatus],
204206
classOf[HighlyCompressedMapStatus],
207+
classOf[RoaringBitmap],
208+
classOf[RoaringArray],
209+
classOf[RoaringArray.Element],
210+
classOf[Array[RoaringArray.Element]],
211+
classOf[ArrayContainer],
212+
classOf[Array[Short]],
205213
classOf[CompactBuffer[_]],
206214
classOf[BlockManagerId],
207215
classOf[Array[Byte]],

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.serializer
1919

20+
import org.apache.spark.scheduler.HighlyCompressedMapStatus
21+
import org.apache.spark.storage.BlockManagerId
22+
2023
import scala.collection.mutable
2124
import scala.reflect.ClassTag
2225

@@ -242,6 +245,16 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
242245
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
243246
}
244247
}
248+
249+
test("registration of HighlyCompressedMapStatus") {
250+
val conf = new SparkConf(false)
251+
conf.set("spark.kryo.registrationRequired", "true")
252+
val hcmo = HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), Array(0l,2l,5l))
253+
val ser = new KryoSerializer(conf)
254+
val serInstance = ser.newInstance()
255+
serInstance.serialize(hcmo)
256+
257+
}
245258
}
246259

247260

0 commit comments

Comments
 (0)