Skip to content

Commit 6c35865

Browse files
artemrdcloud-fan
authored andcommitted
[SPARK-22371][CORE] Return None instead of throwing an exception when an accumulator is garbage collected.
## What changes were proposed in this pull request? There's a period of time when an accumulator has been garbage collected, but hasn't been removed from AccumulatorContext.originals by ContextCleaner. When an update is received for such accumulator it will throw an exception and kill the whole job. This can happen when a stage completes, but there're still running tasks from other attempts, speculation etc. Since AccumulatorContext.get() returns an option we can just return None in such case. ## How was this patch tested? Unit test. Author: Artem Rudoy <[email protected]> Closes #21114 from artemrd/SPARK-22371.
1 parent 3e66350 commit 6c35865

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap
2424
import java.util.concurrent.atomic.AtomicLong
2525

2626
import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
27+
import org.apache.spark.internal.Logging
2728
import org.apache.spark.scheduler.AccumulableInfo
2829

2930
private[spark] case class AccumulatorMetadata(
@@ -211,7 +212,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
211212
/**
212213
* An internal class used to track accumulators by Spark itself.
213214
*/
214-
private[spark] object AccumulatorContext {
215+
private[spark] object AccumulatorContext extends Logging {
215216

216217
/**
217218
* This global map holds the original accumulator objects that are created on the driver.
@@ -258,13 +259,16 @@ private[spark] object AccumulatorContext {
258259
* Returns the [[AccumulatorV2]] registered with the given ID, if any.
259260
*/
260261
def get(id: Long): Option[AccumulatorV2[_, _]] = {
261-
Option(originals.get(id)).map { ref =>
262-
// Since we are storing weak references, we must check whether the underlying data is valid.
262+
val ref = originals.get(id)
263+
if (ref eq null) {
264+
None
265+
} else {
266+
// Since we are storing weak references, warn when the underlying data is not valid.
263267
val acc = ref.get
264268
if (acc eq null) {
265-
throw new IllegalStateException(s"Attempted to access garbage collected accumulator $id")
269+
logWarning(s"Attempted to access garbage collected accumulator $id")
266270
}
267-
acc
271+
Option(acc)
268272
}
269273
}
270274

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
209209
System.gc()
210210
assert(ref.get.isEmpty)
211211

212-
// Getting a garbage collected accum should throw error
213-
intercept[IllegalStateException] {
214-
AccumulatorContext.get(accId)
215-
}
212+
// Getting a garbage collected accum should return None.
213+
assert(AccumulatorContext.get(accId).isEmpty)
216214

217215
// Getting a normal accumulator. Note: this has to be separate because referencing an
218216
// accumulator above in an `assert` would keep it from being garbage collected.

0 commit comments

Comments
 (0)