From 71845e3d7805741dd830cdd66d8a49c24f2bdc95 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 20 May 2015 12:55:03 -0700 Subject: [PATCH 1/2] Add failing regression test to trigger Kryo re-use bug --- .../serializer/KryoSerializerSuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c7369de24b81f..c8c896188ee0a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.serializer +import java.io.ByteArrayOutputStream + import scala.collection.mutable import scala.reflect.ClassTag @@ -319,6 +321,35 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] assert(!ser2.getAutoReset) } + + private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = { + val conf = new SparkConf(loadDefaults = false) + .set("spark.kryo.referenceTracking", referenceTracking.toString) + if (!autoReset) { + conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) + } + val ser = new KryoSerializer(conf) + val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance] + assert (serInstance.getAutoReset() === autoReset) + val obj = ("Hello", "World") + def serializeObjects(): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val serStream = serInstance.serializeStream(baos) + serStream.writeObject(obj) + serStream.writeObject(obj) + serStream.close() + baos.toByteArray + } + val output1: Array[Byte] = serializeObjects() + val output2: Array[Byte] = serializeObjects() + assert (output1 === output2) + } + + for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { + test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { + testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + } + } } From e19726d56eaeb8ad5558f09a2f3d3ec6e70bd2f7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 20 May 2015 13:23:25 -0700 Subject: [PATCH 2/2] Add fix for SPARK-7766. --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 ++ .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 64ba27f34d2f1..217957963437d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) try { kryo.writeClassAndObject(output, t) } catch { @@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } override def serializeStream(s: OutputStream): SerializationStream = { + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) new KryoSerializationStream(kryo, s) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c8c896188ee0a..0bd91a8dba2ab 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -345,6 +345,8 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (output1 === output2) } + // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling + // reference-tracking would lead to corrupted output when serializer instances are re-used for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)