Skip to content

Commit c062224

Browse files
zsxwingpwendell
authored andcommitted
[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T]
Added a ClassTag parameter to CompactBuffer. So CompactBuffer[T] can create primitive arrays for primitive types. It will reduce the memory usage for primitive types significantly and only pay minor performance lost. Here is my test code: ```Scala // Call org.apache.spark.util.SizeEstimator.estimate def estimateSize(obj: AnyRef): Long = { val c = Class.forName("org.apache.spark.util.SizeEstimator$") val f = c.getField("MODULE$") val o = f.get(c) val m = c.getMethod("estimate", classOf[Object]) m.setAccessible(true) m.invoke(o, obj).asInstanceOf[Long] } sc.parallelize(1 to 10000).groupBy(_ => 1).foreach { case (k, v) => println(v.getClass() + " size: " + estimateSize(v)) } ``` Using the previous CompactBuffer outputed ``` class org.apache.spark.util.collection.CompactBuffer size: 313358 ``` Using the new CompactBuffer outputed ``` class org.apache.spark.util.collection.CompactBuffer size: 65712 ``` In this case, the new `CompactBuffer` only used 20% memory of the previous one. It's really helpful for `groupByKey` when using a primitive value. Author: zsxwing <[email protected]> Closes #3378 from zsxwing/SPARK-4505 and squashes the following commits: 4abdbba [zsxwing] Add a ClassTag parameter to reduce the memory usage of CompactBuffer[T] when T is a primitive type
1 parent 938dc14 commit c062224

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.util.collection
1919

20+
import scala.reflect.ClassTag
21+
2022
/**
2123
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
2224
* ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
@@ -25,7 +27,7 @@ package org.apache.spark.util.collection
2527
* entries than that. This makes it more efficient for operations like groupBy where we expect
2628
* some keys to have very few elements.
2729
*/
28-
private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
30+
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
2931
// First two elements
3032
private var element0: T = _
3133
private var element1: T = _
@@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
3436
private var curSize = 0
3537

3638
// Array for extra elements
37-
private var otherElements: Array[AnyRef] = null
39+
private var otherElements: Array[T] = null
3840

3941
def apply(position: Int): T = {
4042
if (position < 0 || position >= curSize) {
@@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
4547
} else if (position == 1) {
4648
element1
4749
} else {
48-
otherElements(position - 2).asInstanceOf[T]
50+
otherElements(position - 2)
4951
}
5052
}
5153

@@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
5860
} else if (position == 1) {
5961
element1 = value
6062
} else {
61-
otherElements(position - 2) = value.asInstanceOf[AnyRef]
63+
otherElements(position - 2) = value
6264
}
6365
}
6466

@@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
7274
curSize = 2
7375
} else {
7476
growToSize(curSize + 1)
75-
otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
77+
otherElements(newIndex - 2) = value
7678
}
7779
this
7880
}
@@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
139141
newArrayLen = Int.MaxValue - 2
140142
}
141143
}
142-
val newArray = new Array[AnyRef](newArrayLen)
144+
val newArray = new Array[T](newArrayLen)
143145
if (otherElements != null) {
144146
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
145147
}
@@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
150152
}
151153

152154
private[spark] object CompactBuffer {
153-
def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
155+
def apply[T: ClassTag](): CompactBuffer[T] = new CompactBuffer[T]
154156

155-
def apply[T](value: T): CompactBuffer[T] = {
157+
def apply[T: ClassTag](value: T): CompactBuffer[T] = {
156158
val buf = new CompactBuffer[T]
157159
buf += value
158160
}

0 commit comments

Comments
 (0)