Skip to content

Commit da91ba7

Browse files
committed
Merge remote-tracking branch 'origin/master' into useIsolatedClient
Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
2 parents 5fe5894 + 845d1d4 commit da91ba7

File tree

106 files changed

+34965
-27
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+34965
-27
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.annotation;
19+
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/**
26+
* A class that is considered private to the internals of Spark -- there is a high-likelihood
27+
* they will be changed in future versions of Spark.
28+
*
29+
* This should be used only when the standard Scala / Java means of protecting classes are
30+
* insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
31+
* in its place.
32+
*
33+
* NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
34+
* line of the comment must be ":: Private ::" with no trailing blank line. This is because
35+
* of the known issue that Scaladoc displays only either the annotation or the comment, whichever
36+
* comes first.
37+
*/
38+
@Retention(RetentionPolicy.RUNTIME)
39+
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
40+
ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
41+
public @interface Private {}

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf)
125125
override def newInstance(): SerializerInstance = {
126126
new KryoSerializerInstance(this)
127127
}
128+
129+
private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
130+
// If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
131+
// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
132+
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
133+
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
134+
}
128135
}
129136

130137
private[spark]

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.spark.{SparkConf, SparkEnv}
26-
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.annotation.{DeveloperApi, Private}
2727
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
2828

2929
/**
@@ -63,6 +63,39 @@ abstract class Serializer {
6363

6464
/** Creates a new [[SerializerInstance]]. */
6565
def newInstance(): SerializerInstance
66+
67+
/**
68+
* :: Private ::
69+
* Returns true if this serializer supports relocation of its serialized objects and false
70+
* otherwise. This should return true if and only if reordering the bytes of serialized objects
71+
* in serialization stream output is equivalent to having re-ordered those elements prior to
72+
* serializing them. More specifically, the following should hold if a serializer supports
73+
* relocation:
74+
*
75+
* {{{
76+
* serOut.open()
77+
* position = 0
78+
* serOut.write(obj1)
79+
* serOut.flush()
80+
* position = # of bytes writen to stream so far
81+
* obj1Bytes = output[0:position-1]
82+
* serOut.write(obj2)
83+
* serOut.flush()
84+
* position2 = # of bytes written to stream so far
85+
* obj2Bytes = output[position:position2-1]
86+
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
87+
* }}}
88+
*
89+
* In general, this property should hold for serializers that are stateless and that do not
90+
* write special metadata at the beginning or end of the serialization stream.
91+
*
92+
* This API is private to Spark; this method should not be overridden in third-party subclasses
93+
* or called in user code and is subject to removal in future Spark releases.
94+
*
95+
* See SPARK-7311 for more details.
96+
*/
97+
@Private
98+
private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
6699
}
67100

68101

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
131131
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
132132
private val useSerializedPairBuffer =
133133
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
134-
ser.isInstanceOf[KryoSerializer] &&
135-
serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
134+
ser.supportsRelocationOfSerializedObjects
136135

137136
// Data structures to store in-memory objects before we spill. Depending on whether we have an
138137
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.serializer
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21+
22+
import scala.util.Random
23+
24+
import org.scalatest.{Assertions, FunSuite}
25+
26+
import org.apache.spark.SparkConf
27+
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
28+
29+
/**
30+
* Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
31+
* describe properties of the serialized stream, such as
32+
* [[Serializer.supportsRelocationOfSerializedObjects]].
33+
*/
34+
class SerializerPropertiesSuite extends FunSuite {
35+
36+
import SerializerPropertiesSuite._
37+
38+
test("JavaSerializer does not support relocation") {
39+
// Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
40+
// full class name the first time an object is written to an output stream, but subsequent
41+
// references to the class write a more compact identifier; this prevents relocation.
42+
val ser = new JavaSerializer(new SparkConf())
43+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
44+
}
45+
46+
test("KryoSerializer supports relocation when auto-reset is enabled") {
47+
val ser = new KryoSerializer(new SparkConf)
48+
assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
49+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
50+
}
51+
52+
test("KryoSerializer does not support relocation when auto-reset is disabled") {
53+
val conf = new SparkConf().set("spark.kryo.registrator",
54+
classOf[RegistratorWithoutAutoReset].getName)
55+
val ser = new KryoSerializer(conf)
56+
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
57+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
58+
}
59+
60+
}
61+
62+
object SerializerPropertiesSuite extends Assertions {
63+
64+
def generateRandomItem(rand: Random): Any = {
65+
val randomFunctions: Seq[() => Any] = Seq(
66+
() => rand.nextInt(),
67+
() => rand.nextString(rand.nextInt(10)),
68+
() => rand.nextDouble(),
69+
() => rand.nextBoolean(),
70+
() => (rand.nextInt(), rand.nextString(rand.nextInt(10))),
71+
() => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))),
72+
() => {
73+
val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10)))
74+
(x, x)
75+
}
76+
)
77+
randomFunctions(rand.nextInt(randomFunctions.size)).apply()
78+
}
79+
80+
def testSupportsRelocationOfSerializedObjects(
81+
serializer: Serializer,
82+
generateRandomItem: Random => Any): Unit = {
83+
if (!serializer.supportsRelocationOfSerializedObjects) {
84+
return
85+
}
86+
val NUM_TRIALS = 5
87+
val rand = new Random(42)
88+
for (_ <- 1 to NUM_TRIALS) {
89+
val items = {
90+
// Make sure that we have duplicate occurrences of the same object in the stream:
91+
val randomItems = Seq.fill(10)(generateRandomItem(rand))
92+
randomItems ++ randomItems.take(5)
93+
}
94+
val baos = new ByteArrayOutputStream()
95+
val serStream = serializer.newInstance().serializeStream(baos)
96+
def serializeItem(item: Any): Array[Byte] = {
97+
val itemStartOffset = baos.toByteArray.length
98+
serStream.writeObject(item)
99+
serStream.flush()
100+
val itemEndOffset = baos.toByteArray.length
101+
baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone()
102+
}
103+
val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = {
104+
val serItems = items.map {
105+
item => (item, serializeItem(item))
106+
}
107+
serStream.close()
108+
rand.shuffle(serItems)
109+
}
110+
val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray
111+
val deserializedItemsStream = serializer.newInstance().deserializeStream(
112+
new ByteArrayInputStream(reorderedSerializedData))
113+
assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1))
114+
deserializedItemsStream.close()
115+
}
116+
}
117+
}
118+
119+
private case class MyCaseClass(foo: Int, bar: String)

0 commit comments

Comments
 (0)