17
17
18
18
package org .apache .spark .unsafe .memory ;
19
19
20
+ import java .lang .ref .SoftReference ;
21
+ import java .util .HashMap ;
22
+ import java .util .LinkedList ;
23
+ import java .util .Map ;
24
+ import javax .annotation .concurrent .GuardedBy ;
25
+
20
26
/**
21
27
* Manages memory for an executor. Individual operators / tasks allocate memory through
22
28
* {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager.
@@ -33,6 +39,12 @@ public class ExecutorMemoryManager {
33
39
*/
34
40
final boolean inHeap ;
35
41
42
+ @ GuardedBy ("this" )
43
+ private final Map <Long , LinkedList <SoftReference <MemoryBlock >>> bufferPoolsBySize =
44
+ new HashMap <Long , LinkedList <SoftReference <MemoryBlock >>>();
45
+
46
+ private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024 ;
47
+
36
48
/**
37
49
* Construct a new ExecutorMemoryManager.
38
50
*
@@ -43,16 +55,57 @@ public ExecutorMemoryManager(MemoryAllocator allocator) {
43
55
this .allocator = allocator ;
44
56
}
45
57
58
+ /**
59
+ * Returns true if allocations of the given size should go through the pooling mechanism and
60
+ * false otherwise.
61
+ */
62
+ private boolean shouldPool (long size ) {
63
+ // Very small allocations are less likely to benefit from pooling.
64
+ // At some point, we should explore supporting pooling for off-heap memory, but for now we'll
65
+ // ignore that case in the interest of simplicity.
66
+ return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator ;
67
+ }
68
+
46
69
/**
47
70
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
48
71
* to be zeroed out (call `zero()` on the result if this is necessary).
49
72
*/
50
73
MemoryBlock allocate (long size ) throws OutOfMemoryError {
51
- return allocator .allocate (size );
74
+ if (shouldPool (size )) {
75
+ synchronized (this ) {
76
+ final LinkedList <SoftReference <MemoryBlock >> pool = bufferPoolsBySize .get (size );
77
+ if (pool != null ) {
78
+ while (!pool .isEmpty ()) {
79
+ final SoftReference <MemoryBlock > blockReference = pool .pop ();
80
+ final MemoryBlock memory = blockReference .get ();
81
+ if (memory != null ) {
82
+ assert (memory .size () == size );
83
+ return memory ;
84
+ }
85
+ }
86
+ bufferPoolsBySize .remove (size );
87
+ }
88
+ }
89
+ return allocator .allocate (size );
90
+ } else {
91
+ return allocator .allocate (size );
92
+ }
52
93
}
53
94
54
95
void free (MemoryBlock memory ) {
55
- allocator .free (memory );
96
+ final long size = memory .size ();
97
+ if (shouldPool (size )) {
98
+ synchronized (this ) {
99
+ LinkedList <SoftReference <MemoryBlock >> pool = bufferPoolsBySize .get (size );
100
+ if (pool == null ) {
101
+ pool = new LinkedList <SoftReference <MemoryBlock >>();
102
+ bufferPoolsBySize .put (size , pool );
103
+ }
104
+ pool .add (new SoftReference <MemoryBlock >(memory ));
105
+ }
106
+ } else {
107
+ allocator .free (memory );
108
+ }
56
109
}
57
110
58
111
}
0 commit comments