23
23
import java .util .LinkedList ;
24
24
import java .util .List ;
25
25
26
+ import com .google .common .annotations .VisibleForTesting ;
27
+
26
28
import org .apache .spark .unsafe .*;
27
29
import org .apache .spark .unsafe .array .ByteArrayMethods ;
28
30
import org .apache .spark .unsafe .array .LongArray ;
36
38
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
37
39
* which is guaranteed to exhaust the space.
38
40
* <p>
39
- * The map can support up to 2^31 keys because we use 32 bit MurmurHash. If the key cardinality is
40
- * higher than this, you should probably be using sorting instead of hashing for better cache
41
- * locality.
41
+ * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
42
+ * probably be using sorting instead of hashing for better cache locality.
42
43
* <p>
43
44
* This class is not thread safe.
44
45
*/
@@ -48,6 +49,11 @@ public final class BytesToBytesMap {
48
49
49
50
private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy .DOUBLING ;
50
51
52
+ /**
53
+ * Special record length that is placed after the last record in a data page.
54
+ */
55
+ private static final int END_OF_PAGE_MARKER = -1 ;
56
+
51
57
private final TaskMemoryManager memoryManager ;
52
58
53
59
/**
@@ -64,7 +70,7 @@ public final class BytesToBytesMap {
64
70
65
71
/**
66
72
* Offset into `currentDataPage` that points to the location where new data can be inserted into
67
- * the page.
73
+ * the page. This does not incorporate the page's base offset.
68
74
*/
69
75
private long pageCursor = 0 ;
70
76
@@ -74,6 +80,15 @@ public final class BytesToBytesMap {
74
80
*/
75
81
private static final long PAGE_SIZE_BYTES = 1L << 26 ; // 64 megabytes
76
82
83
+ /**
84
+ * The maximum number of keys that BytesToBytesMap supports. The hash table has to be
85
+ * power-of-2-sized and its backing Java array can contain at most (1 << 30) elements, since
86
+ * that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
87
+ * entries per key, giving us a maximum capacity of (1 << 29).
88
+ */
89
+ @ VisibleForTesting
90
+ static final int MAX_CAPACITY = (1 << 29 );
91
+
77
92
// This choice of page table size and page size means that we can address up to 500 gigabytes
78
93
// of memory.
79
94
@@ -143,6 +158,13 @@ public BytesToBytesMap(
143
158
this .loadFactor = loadFactor ;
144
159
this .loc = new Location ();
145
160
this .enablePerfMetrics = enablePerfMetrics ;
161
+ if (initialCapacity <= 0 ) {
162
+ throw new IllegalArgumentException ("Initial capacity must be greater than 0" );
163
+ }
164
+ if (initialCapacity > MAX_CAPACITY ) {
165
+ throw new IllegalArgumentException (
166
+ "Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY );
167
+ }
146
168
allocate (initialCapacity );
147
169
}
148
170
@@ -162,6 +184,55 @@ public BytesToBytesMap(
162
184
*/
163
185
public int size () { return size ; }
164
186
187
+ private static final class BytesToBytesMapIterator implements Iterator <Location > {
188
+
189
+ private final int numRecords ;
190
+ private final Iterator <MemoryBlock > dataPagesIterator ;
191
+ private final Location loc ;
192
+
193
+ private int currentRecordNumber = 0 ;
194
+ private Object pageBaseObject ;
195
+ private long offsetInPage ;
196
+
197
+ BytesToBytesMapIterator (int numRecords , Iterator <MemoryBlock > dataPagesIterator , Location loc ) {
198
+ this .numRecords = numRecords ;
199
+ this .dataPagesIterator = dataPagesIterator ;
200
+ this .loc = loc ;
201
+ if (dataPagesIterator .hasNext ()) {
202
+ advanceToNextPage ();
203
+ }
204
+ }
205
+
206
+ private void advanceToNextPage () {
207
+ final MemoryBlock currentPage = dataPagesIterator .next ();
208
+ pageBaseObject = currentPage .getBaseObject ();
209
+ offsetInPage = currentPage .getBaseOffset ();
210
+ }
211
+
212
+ @ Override
213
+ public boolean hasNext () {
214
+ return currentRecordNumber != numRecords ;
215
+ }
216
+
217
+ @ Override
218
+ public Location next () {
219
+ int keyLength = (int ) PlatformDependent .UNSAFE .getLong (pageBaseObject , offsetInPage );
220
+ if (keyLength == END_OF_PAGE_MARKER ) {
221
+ advanceToNextPage ();
222
+ keyLength = (int ) PlatformDependent .UNSAFE .getLong (pageBaseObject , offsetInPage );
223
+ }
224
+ loc .with (pageBaseObject , offsetInPage );
225
+ offsetInPage += 8 + 8 + keyLength + loc .getValueLength ();
226
+ currentRecordNumber ++;
227
+ return loc ;
228
+ }
229
+
230
+ @ Override
231
+ public void remove () {
232
+ throw new UnsupportedOperationException ();
233
+ }
234
+ }
235
+
165
236
/**
166
237
* Returns an iterator for iterating over the entries of this map.
167
238
*
@@ -171,27 +242,7 @@ public BytesToBytesMap(
171
242
* `lookup()`, the behavior of the returned iterator is undefined.
172
243
*/
173
244
public Iterator <Location > iterator () {
174
- return new Iterator <Location >() {
175
-
176
- private int nextPos = bitset .nextSetBit (0 );
177
-
178
- @ Override
179
- public boolean hasNext () {
180
- return nextPos != -1 ;
181
- }
182
-
183
- @ Override
184
- public Location next () {
185
- final int pos = nextPos ;
186
- nextPos = bitset .nextSetBit (nextPos + 1 );
187
- return loc .with (pos , 0 , true );
188
- }
189
-
190
- @ Override
191
- public void remove () {
192
- throw new UnsupportedOperationException ();
193
- }
194
- };
245
+ return new BytesToBytesMapIterator (size , dataPages .iterator (), loc );
195
246
}
196
247
197
248
/**
@@ -268,8 +319,11 @@ public final class Location {
268
319
private int valueLength ;
269
320
270
321
private void updateAddressesAndSizes (long fullKeyAddress ) {
271
- final Object page = memoryManager .getPage (fullKeyAddress );
272
- final long keyOffsetInPage = memoryManager .getOffsetInPage (fullKeyAddress );
322
+ updateAddressesAndSizes (
323
+ memoryManager .getPage (fullKeyAddress ), memoryManager .getOffsetInPage (fullKeyAddress ));
324
+ }
325
+
326
+ private void updateAddressesAndSizes (Object page , long keyOffsetInPage ) {
273
327
long position = keyOffsetInPage ;
274
328
keyLength = (int ) PlatformDependent .UNSAFE .getLong (page , position );
275
329
position += 8 ; // word used to store the key size
@@ -291,6 +345,12 @@ Location with(int pos, int keyHashcode, boolean isDefined) {
291
345
return this ;
292
346
}
293
347
348
+ Location with (Object page , long keyOffsetInPage ) {
349
+ this .isDefined = true ;
350
+ updateAddressesAndSizes (page , keyOffsetInPage );
351
+ return this ;
352
+ }
353
+
294
354
/**
295
355
* Returns true if the key is defined at this position, and false otherwise.
296
356
*/
@@ -345,6 +405,8 @@ public int getValueLength() {
345
405
* <p>
346
406
* It is only valid to call this method immediately after calling `lookup()` using the same key.
347
407
* <p>
408
+ * The key and value must be word-aligned (that is, their sizes must multiples of 8).
409
+ * <p>
348
410
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
349
411
* will return information on the data stored by this `putNewKey` call.
350
412
* <p>
@@ -370,17 +432,27 @@ public void putNewKey(
370
432
isDefined = true ;
371
433
assert (keyLengthBytes % 8 == 0 );
372
434
assert (valueLengthBytes % 8 == 0 );
435
+ if (size == MAX_CAPACITY ) {
436
+ throw new IllegalStateException ("BytesToBytesMap has reached maximum capacity" );
437
+ }
373
438
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
374
439
// the key address instead of storing the absolute address of the value, the key and value
375
440
// must be stored in the same memory page.
376
441
// (8 byte key length) (key) (8 byte value length) (value)
377
442
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes ;
378
- assert (requiredSize <= PAGE_SIZE_BYTES );
443
+ assert (requiredSize <= PAGE_SIZE_BYTES - 8 ); // Reserve 8 bytes for the end-of-page marker.
379
444
size ++;
380
445
bitset .set (pos );
381
446
382
- // If there's not enough space in the current page, allocate a new page:
383
- if (currentDataPage == null || PAGE_SIZE_BYTES - pageCursor < requiredSize ) {
447
+ // If there's not enough space in the current page, allocate a new page (8 bytes are reserved
448
+ // for the end-of-page marker).
449
+ if (currentDataPage == null || PAGE_SIZE_BYTES - 8 - pageCursor < requiredSize ) {
450
+ if (currentDataPage != null ) {
451
+ // There wasn't enough space in the current page, so write an end-of-page marker:
452
+ final Object pageBaseObject = currentDataPage .getBaseObject ();
453
+ final long lengthOffsetInPage = currentDataPage .getBaseOffset () + pageCursor ;
454
+ PlatformDependent .UNSAFE .putLong (pageBaseObject , lengthOffsetInPage , END_OF_PAGE_MARKER );
455
+ }
384
456
MemoryBlock newPage = memoryManager .allocatePage (PAGE_SIZE_BYTES );
385
457
dataPages .add (newPage );
386
458
pageCursor = 0 ;
@@ -414,7 +486,7 @@ public void putNewKey(
414
486
longArray .set (pos * 2 + 1 , keyHashcode );
415
487
updateAddressesAndSizes (storedKeyAddress );
416
488
isDefined = true ;
417
- if (size > growthThreshold ) {
489
+ if (size > growthThreshold && longArray . size () < MAX_CAPACITY ) {
418
490
growAndRehash ();
419
491
}
420
492
}
@@ -427,8 +499,11 @@ public void putNewKey(
427
499
* @param capacity the new map capacity
428
500
*/
429
501
private void allocate (int capacity ) {
430
- capacity = Math .max ((int ) Math .min (Integer .MAX_VALUE , nextPowerOf2 (capacity )), 64 );
431
- longArray = new LongArray (memoryManager .allocate (capacity * 8 * 2 ));
502
+ assert (capacity >= 0 );
503
+ // The capacity needs to be divisible by 64 so that our bit set can be sized properly
504
+ capacity = Math .max ((int ) Math .min (MAX_CAPACITY , nextPowerOf2 (capacity )), 64 );
505
+ assert (capacity <= MAX_CAPACITY );
506
+ longArray = new LongArray (memoryManager .allocate (capacity * 8L * 2 ));
432
507
bitset = new BitSet (MemoryBlock .fromLongArray (new long [capacity / 64 ]));
433
508
434
509
this .growthThreshold = (int ) (capacity * loadFactor );
@@ -494,10 +569,16 @@ public long getNumHashCollisions() {
494
569
return numHashCollisions ;
495
570
}
496
571
572
+ @ VisibleForTesting
573
+ int getNumDataPages () {
574
+ return dataPages .size ();
575
+ }
576
+
497
577
/**
498
578
* Grows the size of the hash table and re-hash everything.
499
579
*/
500
- private void growAndRehash () {
580
+ @ VisibleForTesting
581
+ void growAndRehash () {
501
582
long resizeStartTime = -1 ;
502
583
if (enablePerfMetrics ) {
503
584
resizeStartTime = System .nanoTime ();
@@ -508,7 +589,7 @@ private void growAndRehash() {
508
589
final int oldCapacity = (int ) oldBitSet .capacity ();
509
590
510
591
// Allocate the new data structures
511
- allocate (Math .min (Integer . MAX_VALUE , growthStrategy .nextCapacity (oldCapacity )));
592
+ allocate (Math .min (growthStrategy .nextCapacity (oldCapacity ), MAX_CAPACITY ));
512
593
513
594
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
514
595
for (int pos = oldBitSet .nextSetBit (0 ); pos >= 0 ; pos = oldBitSet .nextSetBit (pos + 1 )) {
0 commit comments