@@ -32,8 +32,8 @@ import org.apache.spark.serializer.SerializerInstance
32
32
*/
33
33
class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
34
34
35
- // Start a SparkContext so that SparkEnv.get.closureSerializer is accessible
36
- // We do not actually use this explicitly except to stop it later
35
+ // Start a SparkContext so that the closure serializer is accessible
36
+ // We do not actually use this explicitly otherwise
37
37
private var sc : SparkContext = null
38
38
private var closureSerializer : SerializerInstance = null
39
39
@@ -48,7 +48,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
48
48
closureSerializer = null
49
49
}
50
50
51
- // Some fields and methods that belong to this class, which is itself not serializable
51
+ // Some fields and methods to reference in inner closures later
52
52
private val someSerializableValue = 1
53
53
private val someNonSerializableValue = new NonSerializable
54
54
private def someSerializableMethod () = 1
@@ -86,19 +86,19 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
86
86
assertSerializable(closure, serializableBefore)
87
87
// If the resulting closure is not serializable even after
88
88
// cleaning, we expect ClosureCleaner to throw a SparkException
89
- intercept[ SparkException ] {
89
+ if (serializableAfter) {
90
90
ClosureCleaner .clean(closure, checkSerializable = true , transitive)
91
- // Otherwise, if we do expect the closure to be serializable after the
92
- // clean, throw the SparkException ourselves so scalatest is happy
93
- if (serializableAfter) { throw new SparkException (" no-op" ) }
91
+ } else {
92
+ intercept[SparkException ] {
93
+ ClosureCleaner .clean(closure, checkSerializable = true , transitive)
94
+ }
94
95
}
95
96
assertSerializable(closure, serializableAfter)
96
97
}
97
98
98
99
/**
99
100
* Return the fields accessed by the given closure by class.
100
- * This also optionally finds the fields transitively referenced through methods
101
- * that belong to other classes.
101
+ * This also optionally finds the fields transitively referenced through methods invocations.
102
102
*/
103
103
private def findAccessedFields (
104
104
closure : AnyRef ,
@@ -211,7 +211,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
211
211
val outerObjects2 = getOuterObjects(closure2)
212
212
assert(outerClasses1.size === outerObjects1.size)
213
213
assert(outerClasses2.size === outerObjects2.size)
214
- // These inner closures only reference local variables, and so do not have $outer pointer
214
+ // These inner closures only reference local variables, and so do not have $outer pointers
215
215
assert(outerClasses1.isEmpty)
216
216
assert(outerClasses2.isEmpty)
217
217
}
@@ -235,8 +235,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
235
235
// This closure references the "test2" scope because it needs to find the method `y`
236
236
// Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2
237
237
assert(outerClasses2.size === 3 )
238
- // This closure references the "test2" scope because it needs to find the
239
- // `localValue` defined outside of this scope
238
+ // This closure references the "test2" scope because it needs to find the `localValue`
239
+ // defined outside of this scope
240
240
assert(outerClasses3.size === 3 )
241
241
assert(isClosure(outerClasses2(0 )))
242
242
assert(isClosure(outerClasses3(0 )))
@@ -270,10 +270,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
270
270
assert(fields2.isEmpty)
271
271
assert(fields3.size === 2 )
272
272
// This corresponds to the "FunSuite#test" closure. This is empty because the
273
- // field `closure3` references belongs to its parent (i.e. ClosureCleanerSuite2)
273
+ // `someSerializableValue` belongs to its parent (i.e. ClosureCleanerSuite2).
274
274
assert(fields3(outerClasses3(0 )).isEmpty)
275
275
// This corresponds to the ClosureCleanerSuite2. This is also empty, however,
276
- // because we did not find fields transitively (i.e. beyond 1 enclosing scope)
276
+ // because accessing a `ClosureCleanerSuite2#someSerializableValue` actually involves a
277
+ // method call. Since we do not find fields transitively, we will not recursively trace
278
+ // through the fields referenced by this method.
277
279
assert(fields3(outerClasses3(1 )).isEmpty)
278
280
279
281
val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true )
@@ -283,7 +285,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
283
285
assert(fields2t.isEmpty)
284
286
assert(fields3t.size === 2 )
285
287
// Because we find fields transitively now, we are able to detect that we need the
286
- // $outer pointer to get the field from the ClosureCleanerSuite2.
288
+ // $outer pointer to get the field from the ClosureCleanerSuite2
287
289
assert(fields3t(outerClasses3(0 )).size === 1 )
288
290
assert(fields3t(outerClasses3(0 )).head === " $outer" )
289
291
assert(fields3t(outerClasses3(1 )).size === 1 )
@@ -304,31 +306,32 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
304
306
val outerClasses3 = getOuterClasses(closure3)
305
307
val outerClasses4 = getOuterClasses(closure4)
306
308
307
- // First, find only fields the closures directly access
309
+ // First, find only fields accessed directly, not transitively, by these closures
308
310
val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false )
309
311
val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false )
310
312
val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false )
311
313
val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false )
312
314
assert(fields1.isEmpty)
313
315
// "test1" < "FunSuite#test" < ClosureCleanerSuite2
314
316
assert(fields2.size === 3 )
315
- assert(fields2(outerClasses2(0 )).isEmpty) // `def a` is not a field
316
- assert(fields2(outerClasses2(1 )).isEmpty)
317
- assert(fields2(outerClasses2(2 )).isEmpty)
317
+ // Since we do not find fields transitively here, we do not look into what `def a` references
318
+ assert(fields2(outerClasses2(0 )).isEmpty) // This corresponds to the "test1" scope
319
+ assert(fields2(outerClasses2(1 )).isEmpty) // This corresponds to the "FunSuite#test" scope
320
+ assert(fields2(outerClasses2(2 )).isEmpty) // This corresponds to the ClosureCleanerSuite2
318
321
assert(fields3.size === 3 )
319
- // Note that `localValue` is a field of the "test1" closure because `def a` needs it
320
- // Further note that it is NOT a field of the "FunSuite#test" closure but a local variable
322
+ // Note that `localValue` is a field of the "test1" scope because `def a` references it,
323
+ // but NOT a field of the "FunSuite#test" scope because it is only a local variable there
321
324
assert(fields3(outerClasses3(0 )).size === 1 )
322
325
assert(fields3(outerClasses3(0 )).head.contains(" localValue" ))
323
326
assert(fields3(outerClasses3(1 )).isEmpty)
324
327
assert(fields3(outerClasses3(2 )).isEmpty)
325
328
assert(fields4.size === 3 )
329
+ // Because `val someSerializableValue` is an instance variable, even an explicit reference
330
+ // here actually involves a method call to access the underlying value of the variable.
331
+ // Because we are not finding fields transitively here, we do not consider the fields
332
+ // accessed by this "method" (i.e. the val's accessor).
326
333
assert(fields4(outerClasses4(0 )).isEmpty)
327
334
assert(fields4(outerClasses4(1 )).isEmpty)
328
- // Because `someSerializableValue` is a val, even an explicit reference here actually
329
- // involves a method call to access the underlying value of the variable. Because we are
330
- // not finding fields transitively here, we do not consider the fields accessed by this
331
- // "method" (i.e. the val's accessor).
332
335
assert(fields4(outerClasses4(2 )).isEmpty)
333
336
334
337
// Now do the same, but find fields that the closures transitively reference
@@ -338,8 +341,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
338
341
val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true )
339
342
assert(fields1t.isEmpty)
340
343
assert(fields2t.size === 3 )
341
- // This closure transitively references `localValue` because `def a` uses it
342
- assert(fields2t(outerClasses2(0 )).size === 1 )
344
+ assert(fields2t(outerClasses2(0 )).size === 1 ) // `def a` references `localValue`
343
345
assert(fields2t(outerClasses2(0 )).head.contains(" localValue" ))
344
346
assert(fields2t(outerClasses2(1 )).isEmpty)
345
347
assert(fields2t(outerClasses2(2 )).isEmpty)
@@ -362,11 +364,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
362
364
}
363
365
364
366
test(" clean basic serializable closures" ) {
365
- val localSerializableVal = someSerializableValue
367
+ val localValue = someSerializableValue
366
368
val closure1 = () => 1
367
369
val closure2 = () => Array [String ](" a" , " b" , " c" )
368
370
val closure3 = (s : String , arr : Array [Long ]) => s + arr.mkString(" , " )
369
- val closure4 = () => localSerializableVal
371
+ val closure4 = () => localValue
370
372
val closure5 = () => new NonSerializable (5 ) // we're just serializing the class information
371
373
val closure1r = closure1()
372
374
val closure2r = closure2()
@@ -395,7 +397,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
395
397
val closure4 = () => someNonSerializableValue
396
398
val closure2 = () => someNonSerializableMethod()
397
399
398
- // These are not cleanable because they ultimately reference the `this` pointer
400
+ // These are not cleanable because they ultimately reference the ClosureCleanerSuite2
399
401
testClean(closure1, serializableBefore = false , serializableAfter = false )
400
402
testClean(closure2, serializableBefore = false , serializableAfter = false )
401
403
testClean(closure3, serializableBefore = false , serializableAfter = false )
@@ -404,13 +406,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
404
406
}
405
407
406
408
test(" clean basic nested serializable closures" ) {
407
- val localSerializableValue = someSerializableValue
409
+ val localValue = someSerializableValue
408
410
val closure1 = (i : Int ) => {
409
- (1 to i).map { x => x + localSerializableValue } // 1 level of nesting
411
+ (1 to i).map { x => x + localValue } // 1 level of nesting
410
412
}
411
413
val closure2 = (j : Int ) => {
412
414
(1 to j).flatMap { x =>
413
- (1 to x).map { y => y + localSerializableValue } // 2 levels
415
+ (1 to x).map { y => y + localValue } // 2 levels
414
416
}
415
417
}
416
418
val closure3 = (k : Int , l : Int , m : Int ) => {
@@ -426,6 +428,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
426
428
testClean(closure2, serializableBefore = true , serializableAfter = true )
427
429
testClean(closure3, serializableBefore = true , serializableAfter = true )
428
430
431
+ // Verify that closures can still be invoked and the result still the same
429
432
assert(closure1(1 ) === closure1r)
430
433
assert(closure2(2 ) === closure2r)
431
434
assert(closure3(3 , 4 , 5 ) === closure3r)
@@ -434,9 +437,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
434
437
test(" clean basic nested non-serializable closures" ) {
435
438
def localSerializableMethod () = someSerializableValue
436
439
val localNonSerializableValue = someNonSerializableValue
440
+ // These closures ultimately reference the ClosureCleanerSuite2
441
+ // Note that even accessing `val` that is an instance variable involves a method call
437
442
val closure1 = (i : Int ) => { (1 to i).map { x => x + someSerializableValue } }
438
443
val closure2 = (j : Int ) => { (1 to j).map { x => x + someSerializableMethod() } }
439
444
val closure4 = (k : Int ) => { (1 to k).map { x => x + localSerializableMethod() } }
445
+ // This closure references a local non-serializable value
440
446
val closure3 = (l : Int ) => { (1 to l).map { x => localNonSerializableValue } }
441
447
// This is non-serializable no matter how many levels we nest it
442
448
val closure5 = (m : Int ) => {
@@ -457,15 +463,18 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
457
463
}
458
464
459
465
test(" clean complicated nested serializable closures" ) {
460
- val localSerializableValue = someSerializableValue
466
+ val localValue = someSerializableValue
467
+
468
+ // Here we assume that if the outer closure is serializable,
469
+ // then all inner closures must also be serializable
461
470
462
471
// Reference local fields from all levels
463
472
val closure1 = (i : Int ) => {
464
473
val a = 1
465
474
(1 to i).flatMap { x =>
466
475
val b = a + 1
467
476
(1 to x).map { y =>
468
- y + a + b + localSerializableValue
477
+ y + a + b + localValue
469
478
}
470
479
}
471
480
}
@@ -479,8 +488,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
479
488
def b2 = a2 + 1
480
489
(1 to x).map { y =>
481
490
// If this references a method outside the outermost closure, then it will try to pull
482
- // in the ClosureCleanerSuite2. This is why `localSerializableValue ` here must be a val.
483
- y + a1 + a2 + b1 + b2 + localSerializableValue
491
+ // in the ClosureCleanerSuite2. This is why `localValue ` here must be a local ` val` .
492
+ y + a1 + a2 + b1 + b2 + localValue
484
493
}
485
494
}
486
495
}
@@ -494,13 +503,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
494
503
}
495
504
496
505
test(" clean complicated nested non-serializable closures" ) {
497
- val localSerializableValue = someSerializableValue
506
+ val localValue = someSerializableValue
498
507
499
- // Note that we are not interested in cleaning the outer closures here
508
+ // Note that we are not interested in cleaning the outer closures here (they are not cleanable)
500
509
// The only reason why they exist is to nest the inner closures
501
510
502
511
val test1 = () => {
503
- val a = localSerializableValue
512
+ val a = localValue
504
513
val b = sc
505
514
val inner1 = (x : Int ) => x + a + b.hashCode()
506
515
val inner2 = (x : Int ) => x + a
@@ -509,15 +518,15 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
509
518
// There is no way to clean it
510
519
testClean(inner1, serializableBefore = false , serializableAfter = false )
511
520
512
- // This closure is serializable to begin with since
513
- // it does not have a pointer to the outer closure
521
+ // This closure is serializable to begin with since it does not need a pointer to
522
+ // the outer closure (it only references local variables)
514
523
testClean(inner2, serializableBefore = true , serializableAfter = true )
515
524
}
516
525
517
526
// Same as above, but the `val a` becomes `def a`
518
527
// The difference here is that all inner closures now have pointers to the outer closure
519
528
val test2 = () => {
520
- def a = localSerializableValue
529
+ def a = localValue
521
530
val b = sc
522
531
val inner1 = (x : Int ) => x + a + b.hashCode()
523
532
val inner2 = (x : Int ) => x + a
0 commit comments