Skip to content

Commit b5dc339

Browse files
committed
Merge pull request alteryx#70 from rxin/hash1
Fast, memory-efficient hash set, hash table implementations optimized for primitive data types. This pull request adds two hash table implementations optimized for primitive data types. For primitive types, the new hash tables are much faster than the current Spark AppendOnlyMap (3X faster - note that the current AppendOnlyMap is already much better than the Java map) while uses much less space (1/4 of the space). Details: This PR first adds a open hash set implementation (OpenHashSet) optimized for primitive types (using Scala's specialization feature). This OpenHashSet is designed to serve as building blocks for more advanced structures. It is currently used to build the following two hash tables, but can be used in the future to build multi-valued hash tables as well (GraphX has this use case). Note that there are some peculiarities in the code for working around some Scala compiler bugs. Building on top of OpenHashSet, this PR adds two different hash tables implementations: 1. OpenHashSet: for nullable keys, optional specialization for primitive values 2. PrimitiveKeyOpenHashMap: for primitive keys that are not nullable, and optional specialization for primitive values I tested the update speed of these two implementations using the changeValue function (which is what Aggregator and cogroup would use). Runtime relative to AppendOnlyMap for inserting 10 million items: Int to Int: ~30% java.lang.Integer to java.lang.Integer: ~100% Int to java.lang.Integer: ~50% java.lang.Integer to Int: ~85%
2 parents 41ead7a + eb5f8a3 commit b5dc339

File tree

9 files changed

+1108
-7
lines changed

9 files changed

+1108
-7
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21-
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
21+
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
2222
import java.util.{Locale, Random, UUID}
23-
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
24-
import java.util.regex.Pattern
23+
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2524

2625
import scala.collection.Map
27-
import scala.collection.mutable.{ArrayBuffer, HashMap}
26+
import scala.collection.mutable.ArrayBuffer
2827
import scala.collection.JavaConversions._
2928
import scala.io.Source
3029

@@ -36,8 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
3635
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
3736
import org.apache.spark.deploy.SparkHadoopUtil
3837
import java.nio.ByteBuffer
39-
import org.apache.spark.{SparkEnv, SparkException, Logging}
40-
import java.util.ConcurrentModificationException
38+
import org.apache.spark.{SparkException, Logging}
4139

4240

4341
/**
@@ -149,7 +147,7 @@ private[spark] object Utils extends Logging {
149147
return buf
150148
}
151149

152-
private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
150+
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
153151

154152
// Register the path to be deleted via shutdown hook
155153
def registerShutdownDeleteDir(file: File) {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.collection
19+
20+
21+
/**
22+
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
23+
* safety/bound checking.
24+
*/
25+
class BitSet(numBits: Int) {
26+
27+
private[this] val words = new Array[Long](bit2words(numBits))
28+
private[this] val numWords = words.length
29+
30+
/**
31+
* Sets the bit at the specified index to true.
32+
* @param index the bit index
33+
*/
34+
def set(index: Int) {
35+
val bitmask = 1L << (index & 0x3f) // mod 64 and shift
36+
words(index >> 6) |= bitmask // div by 64 and mask
37+
}
38+
39+
/**
40+
* Return the value of the bit with the specified index. The value is true if the bit with
41+
* the index is currently set in this BitSet; otherwise, the result is false.
42+
*
43+
* @param index the bit index
44+
* @return the value of the bit with the specified index
45+
*/
46+
def get(index: Int): Boolean = {
47+
val bitmask = 1L << (index & 0x3f) // mod 64 and shift
48+
(words(index >> 6) & bitmask) != 0 // div by 64 and mask
49+
}
50+
51+
/** Return the number of bits set to true in this BitSet. */
52+
def cardinality(): Int = {
53+
var sum = 0
54+
var i = 0
55+
while (i < numWords) {
56+
sum += java.lang.Long.bitCount(words(i))
57+
i += 1
58+
}
59+
sum
60+
}
61+
62+
/**
63+
* Returns the index of the first bit that is set to true that occurs on or after the
64+
* specified starting index. If no such bit exists then -1 is returned.
65+
*
66+
* To iterate over the true bits in a BitSet, use the following loop:
67+
*
68+
* for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
69+
* // operate on index i here
70+
* }
71+
*
72+
* @param fromIndex the index to start checking from (inclusive)
73+
* @return the index of the next set bit, or -1 if there is no such bit
74+
*/
75+
def nextSetBit(fromIndex: Int): Int = {
76+
var wordIndex = fromIndex >> 6
77+
if (wordIndex >= numWords) {
78+
return -1
79+
}
80+
81+
// Try to find the next set bit in the current word
82+
val subIndex = fromIndex & 0x3f
83+
var word = words(wordIndex) >> subIndex
84+
if (word != 0) {
85+
return (wordIndex << 6) + subIndex + java.lang.Long.numberOfTrailingZeros(word)
86+
}
87+
88+
// Find the next set bit in the rest of the words
89+
wordIndex += 1
90+
while (wordIndex < numWords) {
91+
word = words(wordIndex)
92+
if (word != 0) {
93+
return (wordIndex << 6) + java.lang.Long.numberOfTrailingZeros(word)
94+
}
95+
wordIndex += 1
96+
}
97+
98+
-1
99+
}
100+
101+
/** Return the number of longs it would take to hold numBits. */
102+
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
103+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.collection
19+
20+
21+
/**
22+
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
23+
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
24+
* space overhead.
25+
*
26+
* Under the hood, it uses our OpenHashSet implementation.
27+
*/
28+
private[spark]
29+
class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
30+
initialCapacity: Int)
31+
extends Iterable[(K, V)]
32+
with Serializable {
33+
34+
def this() = this(64)
35+
36+
protected var _keySet = new OpenHashSet[K](initialCapacity)
37+
38+
// Init in constructor (instead of in declaration) to work around a Scala compiler specialization
39+
// bug that would generate two arrays (one for Object and one for specialized T).
40+
private var _values: Array[V] = _
41+
_values = new Array[V](_keySet.capacity)
42+
43+
@transient private var _oldValues: Array[V] = null
44+
45+
// Treat the null key differently so we can use nulls in "data" to represent empty items.
46+
private var haveNullValue = false
47+
private var nullValue: V = null.asInstanceOf[V]
48+
49+
override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
50+
51+
/** Get the value for a given key */
52+
def apply(k: K): V = {
53+
if (k == null) {
54+
nullValue
55+
} else {
56+
val pos = _keySet.getPos(k)
57+
if (pos < 0) {
58+
null.asInstanceOf[V]
59+
} else {
60+
_values(pos)
61+
}
62+
}
63+
}
64+
65+
/** Set the value for a key */
66+
def update(k: K, v: V) {
67+
if (k == null) {
68+
haveNullValue = true
69+
nullValue = v
70+
} else {
71+
val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
72+
_values(pos) = v
73+
_keySet.rehashIfNeeded(k, grow, move)
74+
_oldValues = null
75+
}
76+
}
77+
78+
/**
79+
* If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
80+
* set its value to mergeValue(oldValue).
81+
*
82+
* @return the newly updated value.
83+
*/
84+
def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
85+
if (k == null) {
86+
if (haveNullValue) {
87+
nullValue = mergeValue(nullValue)
88+
} else {
89+
haveNullValue = true
90+
nullValue = defaultValue
91+
}
92+
nullValue
93+
} else {
94+
val pos = _keySet.addWithoutResize(k)
95+
if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
96+
val newValue = defaultValue
97+
_values(pos & OpenHashSet.POSITION_MASK) = newValue
98+
_keySet.rehashIfNeeded(k, grow, move)
99+
newValue
100+
} else {
101+
_values(pos) = mergeValue(_values(pos))
102+
_values(pos)
103+
}
104+
}
105+
}
106+
107+
override def iterator = new Iterator[(K, V)] {
108+
var pos = -1
109+
var nextPair: (K, V) = computeNextPair()
110+
111+
/** Get the next value we should return from next(), or null if we're finished iterating */
112+
def computeNextPair(): (K, V) = {
113+
if (pos == -1) { // Treat position -1 as looking at the null value
114+
if (haveNullValue) {
115+
pos += 1
116+
return (null.asInstanceOf[K], nullValue)
117+
}
118+
pos += 1
119+
}
120+
pos = _keySet.nextPos(pos)
121+
if (pos >= 0) {
122+
val ret = (_keySet.getValue(pos), _values(pos))
123+
pos += 1
124+
ret
125+
} else {
126+
null
127+
}
128+
}
129+
130+
def hasNext = nextPair != null
131+
132+
def next() = {
133+
val pair = nextPair
134+
nextPair = computeNextPair()
135+
pair
136+
}
137+
}
138+
139+
// The following member variables are declared as protected instead of private for the
140+
// specialization to work (specialized class extends the non-specialized one and needs access
141+
// to the "private" variables).
142+
// They also should have been val's. We use var's because there is a Scala compiler bug that
143+
// would throw illegal access error at runtime if they are declared as val's.
144+
protected var grow = (newCapacity: Int) => {
145+
_oldValues = _values
146+
_values = new Array[V](newCapacity)
147+
}
148+
149+
protected var move = (oldPos: Int, newPos: Int) => {
150+
_values(newPos) = _oldValues(oldPos)
151+
}
152+
}

0 commit comments

Comments
 (0)