18
18
package org .apache .spark
19
19
20
20
import java .lang .ref .{ReferenceQueue , WeakReference }
21
+ import java .lang .reflect .Field
21
22
22
23
import scala .collection .mutable .{ArrayBuffer , SynchronizedBuffer }
23
24
@@ -64,9 +65,26 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
64
65
65
66
private val cleaningThread = new Thread () { override def run () { keepCleaning() }}
66
67
67
- // Capacity of the reference buffer before we log an error message
68
- private val referenceBufferCapacity = 10000
68
+ /**
69
+ * Keep track of the reference queue length and log an error if this exceeds a certain capacity.
70
+ * Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method,
71
+ * so we have to do this through reflection. This is expensive, however, so we should access
72
+ * this field only once in a while.
73
+ */
74
+ private val queueCapacity = 10000
69
75
private var queueFullErrorMessageLogged = false
76
+ private val queueLengthAccessor : Option [Field ] = {
77
+ try {
78
+ val f = classOf [ReferenceQueue [AnyRef ]].getDeclaredField(" queueLength" )
79
+ f.setAccessible(true )
80
+ Some (f)
81
+ } catch {
82
+ case e : Exception =>
83
+ logDebug(" Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e)
84
+ None
85
+ }
86
+ }
87
+ private val logQueueLengthInterval = 1000
70
88
71
89
/**
72
90
* Whether the cleaning thread will block on cleanup tasks.
@@ -117,13 +135,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
117
135
/** Register an object for cleanup. */
118
136
private def registerForCleanup (objectForCleanup : AnyRef , task : CleanupTask ) {
119
137
referenceBuffer += new CleanupTaskWeakReference (task, objectForCleanup, referenceQueue)
120
- if (referenceBuffer.size > referenceBufferCapacity) {
121
- logQueueFullErrorMessage()
122
- }
123
138
}
124
139
125
140
/** Keep cleaning RDD, shuffle, and broadcast state. */
126
141
private def keepCleaning (): Unit = Utils .logUncaughtExceptions {
142
+ var iteration = 0
127
143
while (! stopped) {
128
144
try {
129
145
val reference = Option (referenceQueue.remove(ContextCleaner .REF_QUEUE_POLL_TIMEOUT ))
@@ -140,10 +156,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
140
156
case CleanBroadcast (broadcastId) =>
141
157
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
142
158
}
159
+ if (iteration % logQueueLengthInterval == 0 ) {
160
+ logQueueLength()
161
+ }
143
162
}
144
163
} catch {
145
164
case e : Exception => logError(" Error in cleaning thread" , e)
146
165
}
166
+ iteration += 1
147
167
}
148
168
}
149
169
@@ -190,16 +210,35 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
190
210
private def logQueueFullErrorMessage (): Unit = {
191
211
if (! queueFullErrorMessageLogged) {
192
212
queueFullErrorMessageLogged = true
193
- logError(s " Reference queue size in ContextCleaner has exceeded $referenceBufferCapacity ! " +
213
+ logError(s " Reference queue size in ContextCleaner has exceeded $queueCapacity ! " +
194
214
" This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow." )
195
- if (! blockOnCleanupTasks) {
215
+ if (blockOnCleanupTasks) {
196
216
logError(" Consider setting spark.cleaner.referenceTracking.blocking to false." +
197
217
" Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " +
198
218
" the workload involves creating many RDDs in quick successions." )
199
219
}
200
220
}
201
221
}
202
222
223
+ /**
224
+ * Log the length of the reference queue through reflection.
225
+ * This is an expensive operation and should be called sparingly.
226
+ */
227
+ private def logQueueLength (): Unit = {
228
+ try {
229
+ queueLengthAccessor.foreach { field =>
230
+ val length = field.getLong(referenceQueue)
231
+ logDebug(" Reference queue size is " + length)
232
+ if (length > queueCapacity) {
233
+ logQueueFullErrorMessage()
234
+ }
235
+ }
236
+ } catch {
237
+ case e : Exception =>
238
+ logDebug(" Failed to access reference queue's length through reflection: " + e)
239
+ }
240
+ }
241
+
203
242
private def blockManagerMaster = sc.env.blockManager.master
204
243
private def broadcastManager = sc.env.broadcastManager
205
244
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf [MapOutputTrackerMaster ]
0 commit comments