18
18
package org .apache .spark
19
19
20
20
import java .lang .ref .{ReferenceQueue , WeakReference }
21
- import java .lang .reflect .Field
22
21
23
22
import scala .collection .mutable .{ArrayBuffer , SynchronizedBuffer }
24
23
@@ -65,27 +64,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
65
64
66
65
private val cleaningThread = new Thread () { override def run () { keepCleaning() }}
67
66
68
- /**
69
- * Keep track of the reference queue length and log an error if this exceeds a certain capacity.
70
- * Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method,
71
- * so we have to do this through reflection. This is expensive, however, so we should access
72
- * this field only once in a while.
73
- */
74
- private val queueCapacity = 10000
75
- private var queueFullErrorMessageLogged = false
76
- private val queueLengthAccessor : Option [Field ] = {
77
- try {
78
- val f = classOf [ReferenceQueue [AnyRef ]].getDeclaredField(" queueLength" )
79
- f.setAccessible(true )
80
- Some (f)
81
- } catch {
82
- case e : Exception =>
83
- logDebug(" Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e)
84
- None
85
- }
86
- }
87
- private val logQueueLengthInterval = 1000
88
-
89
67
/**
90
68
* Whether the cleaning thread will block on cleanup tasks.
91
69
*
@@ -139,7 +117,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
139
117
140
118
/** Keep cleaning RDD, shuffle, and broadcast state. */
141
119
private def keepCleaning (): Unit = Utils .logUncaughtExceptions {
142
- var iteration = 0
143
120
while (! stopped) {
144
121
try {
145
122
val reference = Option (referenceQueue.remove(ContextCleaner .REF_QUEUE_POLL_TIMEOUT ))
@@ -155,14 +132,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
155
132
case CleanBroadcast (broadcastId) =>
156
133
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
157
134
}
158
- if (iteration % logQueueLengthInterval == 0 ) {
159
- logQueueLength()
160
- }
161
135
}
162
136
} catch {
163
137
case e : Exception => logError(" Error in cleaning thread" , e)
164
138
}
165
- iteration += 1
166
139
}
167
140
}
168
141
@@ -203,41 +176,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
203
176
}
204
177
}
205
178
206
- /**
207
- * Log the length of the reference queue through reflection.
208
- * This is an expensive operation and should be called sparingly.
209
- */
210
- private def logQueueLength (): Unit = {
211
- try {
212
- queueLengthAccessor.foreach { field =>
213
- val length = field.getLong(referenceQueue)
214
- logDebug(" Reference queue size is " + length)
215
- if (length > queueCapacity) {
216
- logQueueFullErrorMessage()
217
- }
218
- }
219
- } catch {
220
- case e : Exception =>
221
- logDebug(" Failed to access reference queue's length through reflection: " + e)
222
- }
223
- }
224
-
225
- /**
226
- * Log an error message to indicate that the queue has exceeded its capacity. Do this only once.
227
- */
228
- private def logQueueFullErrorMessage (): Unit = {
229
- if (! queueFullErrorMessageLogged) {
230
- queueFullErrorMessageLogged = true
231
- logError(s " Reference queue size in ContextCleaner has exceeded $queueCapacity! " +
232
- " This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow." )
233
- if (blockOnCleanupTasks) {
234
- logError(" Consider setting spark.cleaner.referenceTracking.blocking to false. " +
235
- " Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " +
236
- " the workload involves creating many RDDs in quick successions." )
237
- }
238
- }
239
- }
240
-
241
179
private def blockManagerMaster = sc.env.blockManager.master
242
180
private def broadcastManager = sc.env.broadcastManager
243
181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf [MapOutputTrackerMaster ]
0 commit comments