Skip to content

Commit a4866e3

Browse files
author
Andrew Or
committed
Add tests (still WIP)
The existing ones are not passing yet because cleaning closures is not idempotent. This will be added in a future commit.
1 parent 438c68f commit a4866e3

File tree

3 files changed

+312
-5
lines changed

3 files changed

+312
-5
lines changed

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
2929
/**
3030
* A cleaner that renders closures serializable if they can be done so safely.
3131
*/
32-
private[spark] object ClosureCleaner extends Logging {
32+
object ClosureCleaner extends Logging {
3333

3434
// Get an ASM class reader for a given class from the JAR that loaded it
3535
def getClassReader(cls: Class[_]): ClassReader = {
@@ -182,11 +182,19 @@ private[spark] object ClosureCleaner extends Logging {
182182
val outerClasses = getOuterClasses(func)
183183
val outerObjects = getOuterObjects(func)
184184

185-
logDebug(s" + inner classes: " + innerClasses.size)
185+
// For logging purposes only
186+
val declaredFields = func.getClass.getDeclaredFields
187+
val declaredMethods = func.getClass.getDeclaredMethods
188+
189+
logDebug(" + declared fields: " + declaredFields.size)
190+
declaredFields.foreach { f => logDebug(" " + f) }
191+
logDebug(" + declared methods: " + declaredMethods.size)
192+
declaredMethods.foreach { m => logDebug(" " + m) }
193+
logDebug(" + inner classes: " + innerClasses.size)
186194
innerClasses.foreach { c => logDebug(" " + c.getName) }
187-
logDebug(s" + outer classes: " + outerClasses.size)
195+
logDebug(" + outer classes: " + outerClasses.size)
188196
outerClasses.foreach { c => logDebug(" " + c.getName) }
189-
logDebug(s" + outer objects: " + outerObjects.size)
197+
logDebug(" + outer objects: " + outerObjects.size)
190198
outerObjects.foreach { o => logDebug(" " + o) }
191199

192200
// Fail fast if we detect return statements in closures
@@ -388,6 +396,7 @@ class FieldAccessFinder(
388396
fields(cl) += name
389397
}
390398
// Visit other methods to find fields that are transitively referenced
399+
// FIXME: This could lead to infinite cycles!!
391400
if (findTransitively) {
392401
ClosureCleaner.getClassReader(cl)
393402
.accept(new FieldAccessFinder(fields, Set(name), findTransitively), 0)

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,14 @@ class ClosureCleanerSuite extends FunSuite {
6767

6868
// A non-serializable class we create in closures to make sure that we aren't
6969
// keeping references to unneeded variables from our outer closures.
70-
class NonSerializable {}
70+
class NonSerializable(val id: Int = -1) {
71+
override def equals(other: Any): Boolean = {
72+
other match {
73+
case o: NonSerializable => id == o.id
74+
case _ => false
75+
}
76+
}
77+
}
7178

7279
object TestObject {
7380
def run(): Int = {
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import java.io.NotSerializableException
21+
22+
import org.scalatest.{BeforeAndAfterAll, FunSuite}
23+
24+
import org.apache.spark.{SparkContext, SparkException}
25+
import org.apache.spark.serializer.SerializerInstance
26+
27+
// TODO: REMOVE ME
28+
import java.util.Properties
29+
import org.apache.log4j.PropertyConfigurator
30+
31+
/**
32+
* Another test suite for the closure cleaner that is finer-grained.
33+
* For tests involving end-to-end Spark jobs, see {{ClosureCleanerSuite}}.
34+
*/
35+
class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll {
36+
37+
// Start a SparkContext so that SparkEnv.get.closureSerializer is accessible
38+
// We do not actually use this explicitly except to stop it later
39+
private var sc: SparkContext = null
40+
private var closureSerializer: SerializerInstance = null
41+
42+
override def beforeAll(): Unit = {
43+
sc = new SparkContext("local", "test")
44+
closureSerializer = sc.env.closureSerializer.newInstance()
45+
}
46+
47+
override def afterAll(): Unit = {
48+
sc.stop()
49+
sc = null
50+
closureSerializer = null
51+
}
52+
53+
// Some fields and methods that belong to this class, which is itself not serializable
54+
private val someSerializableValue = 1
55+
private val someNonSerializableValue = new NonSerializable
56+
private def someSerializableMethod() = 1
57+
private def someNonSerializableMethod() = new NonSerializable
58+
59+
private def assertSerializable(closure: AnyRef, serializable: Boolean): Unit = {
60+
if (serializable) {
61+
closureSerializer.serialize(closure)
62+
} else {
63+
intercept[NotSerializableException] {
64+
closureSerializer.serialize(closure)
65+
}
66+
}
67+
}
68+
69+
private def testClean(
70+
closure: AnyRef,
71+
serializableBefore: Boolean,
72+
serializableAfter: Boolean): Unit = {
73+
testClean(closure, serializableBefore, serializableAfter, transitive = true)
74+
testClean(closure, serializableBefore, serializableAfter, transitive = false)
75+
}
76+
77+
private def testClean(
78+
closure: AnyRef,
79+
serializableBefore: Boolean,
80+
serializableAfter: Boolean,
81+
transitive: Boolean): Unit = {
82+
assertSerializable(closure, serializableBefore)
83+
// If the resulting closure is not serializable even after
84+
// cleaning, we expect ClosureCleaner to throw a SparkException
85+
intercept[SparkException] {
86+
ClosureCleaner.clean(closure, checkSerializable = true, transitive)
87+
// Otherwise, if we do expect the closure to be serializable after the
88+
// clean, throw the SparkException ourselves so scalatest is happy
89+
if (serializableAfter) { throw new SparkException("no-op") }
90+
}
91+
assertSerializable(closure, serializableAfter)
92+
}
93+
94+
test("clean basic serializable closures") {
95+
val localSerializableVal = someSerializableValue
96+
val closure1 = () => 1
97+
val closure2 = () => Array[String]("a", "b", "c")
98+
val closure3 = (s: String, arr: Array[Long]) => s + arr.mkString(", ")
99+
val closure4 = () => localSerializableVal
100+
val closure5 = () => new NonSerializable(5) // we're just serializing the class information
101+
val closure1r = closure1()
102+
val closure2r = closure2()
103+
val closure3r = closure3("g", Array(1, 5, 8))
104+
val closure4r = closure4()
105+
val closure5r = closure5()
106+
107+
testClean(closure1, serializableBefore = true, serializableAfter = true)
108+
testClean(closure2, serializableBefore = true, serializableAfter = true)
109+
testClean(closure3, serializableBefore = true, serializableAfter = true)
110+
testClean(closure4, serializableBefore = true, serializableAfter = true)
111+
testClean(closure5, serializableBefore = true, serializableAfter = true)
112+
113+
// Verify that closures can still be invoked and the result still the same
114+
assert(closure1() === closure1r)
115+
assert(closure2() === closure2r)
116+
assert(closure3("g", Array(1, 5, 8)) === closure3r)
117+
assert(closure4() === closure4r)
118+
assert(closure5() === closure5r)
119+
}
120+
121+
test("clean basic non-serializable closures") {
122+
val closure1 = () => this // ClosureCleanerSuite2 is not serializable
123+
val closure5 = () => someSerializableValue
124+
val closure3 = () => someSerializableMethod()
125+
val closure4 = () => someNonSerializableValue
126+
val closure2 = () => someNonSerializableMethod()
127+
128+
// These are not cleanable because they ultimately reference the `this` pointer
129+
testClean(closure1, serializableBefore = false, serializableAfter = false)
130+
testClean(closure2, serializableBefore = false, serializableAfter = false)
131+
testClean(closure3, serializableBefore = false, serializableAfter = false)
132+
testClean(closure4, serializableBefore = false, serializableAfter = false)
133+
testClean(closure5, serializableBefore = false, serializableAfter = false)
134+
}
135+
136+
test("clean basic nested serializable closures") {
137+
val localSerializableValue = someSerializableValue
138+
val closure1 = (i: Int) => {
139+
(1 to i).map { x => x + localSerializableValue } // 1 level of nesting
140+
}
141+
val closure2 = (j: Int) => {
142+
(1 to j).flatMap { x =>
143+
(1 to x).map { y => y + localSerializableValue } // 2 levels
144+
}
145+
}
146+
val closure3 = (k: Int, l: Int, m: Int) => {
147+
(1 to k).flatMap(closure2) ++ // 4 levels
148+
(1 to l).flatMap(closure1) ++ // 3 levels
149+
(1 to m).map { x => x + 1 } // 2 levels
150+
}
151+
val closure1r = closure1(1)
152+
val closure2r = closure2(2)
153+
val closure3r = closure3(3, 4, 5)
154+
155+
testClean(closure1, serializableBefore = true, serializableAfter = true)
156+
testClean(closure2, serializableBefore = true, serializableAfter = true)
157+
testClean(closure3, serializableBefore = true, serializableAfter = true)
158+
159+
assert(closure1(1) === closure1r)
160+
assert(closure2(2) === closure2r)
161+
assert(closure3(3, 4, 5) === closure3r)
162+
}
163+
164+
test("clean basic nested non-serializable closures") {
165+
def localSerializableMethod() = someSerializableValue
166+
val localNonSerializableValue = someNonSerializableValue
167+
val closure1 = (i: Int) => { (1 to i).map { x => x + someSerializableValue } }
168+
val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } }
169+
val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } }
170+
val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } }
171+
// This is non-serializable no matter how many levels we nest it
172+
val closure5 = (m: Int) => {
173+
(1 to m).foreach { x =>
174+
(1 to x).foreach { y =>
175+
(1 to y).foreach { z =>
176+
someSerializableValue
177+
}
178+
}
179+
}
180+
}
181+
182+
testClean(closure1, serializableBefore = false, serializableAfter = false)
183+
testClean(closure2, serializableBefore = false, serializableAfter = false)
184+
testClean(closure3, serializableBefore = false, serializableAfter = false)
185+
testClean(closure4, serializableBefore = false, serializableAfter = false)
186+
testClean(closure5, serializableBefore = false, serializableAfter = false)
187+
}
188+
189+
test("clean complicated nested serializable closures") {
190+
val localSerializableValue = someSerializableValue
191+
192+
// Reference local fields from all levels
193+
val closure1 = (i: Int) => {
194+
val a = 1
195+
(1 to i).flatMap { x =>
196+
val b = a + 1
197+
(1 to x).map { y =>
198+
y + a + b + localSerializableValue
199+
}
200+
}
201+
}
202+
203+
// Reference local fields and methods from all levels within the outermost closure
204+
val closure2 = (i: Int) => {
205+
val a1 = 1
206+
def a2 = 2
207+
(1 to i).flatMap { x =>
208+
val b1 = a1 + 1
209+
def b2 = a2 + 1
210+
(1 to x).map { y =>
211+
// If this references a method outside the outermost closure, then it will try to pull
212+
// in the ClosureCleanerSuite2. This is why `localSerializableValue` here must be a val.
213+
y + a1 + a2 + b1 + b2 + localSerializableValue
214+
}
215+
}
216+
}
217+
218+
val closure1r = closure1(1)
219+
val closure2r = closure2(2)
220+
testClean(closure1, serializableBefore = true, serializableAfter = true)
221+
testClean(closure2, serializableBefore = true, serializableAfter = true)
222+
assert(closure1(1) == closure1r)
223+
assert(closure2(2) == closure2r)
224+
}
225+
226+
test("clean complicated nested non-serializable closures") {
227+
val localSerializableValue = someSerializableValue
228+
229+
// Note that we are not interested in cleaning the outer closures here
230+
// The only reason why they exist is to nest the inner closures
231+
232+
val test1 = () => {
233+
val a = localSerializableValue
234+
val b = sc
235+
val inner1 = (x: Int) => x + a + b.hashCode()
236+
val inner2 = (x: Int) => x + a
237+
238+
// This closure explicitly references a non-serializable field
239+
// There is no way to clean it
240+
testClean(inner1, serializableBefore = false, serializableAfter = false)
241+
242+
// This closure is serializable to begin with since
243+
// it does not have a pointer to the outer closure
244+
testClean(inner2, serializableBefore = true, serializableAfter = true)
245+
}
246+
247+
// Same as above, but the `val a` becomes `def a`
248+
// The difference here is that all inner closures now have pointers to the outer closure
249+
val test2 = () => {
250+
def a = localSerializableValue
251+
val b = sc
252+
val inner1 = (x: Int) => x + a + b.hashCode()
253+
val inner2 = (x: Int) => x + a
254+
255+
// As before, this closure is neither serializable nor cleanable
256+
testClean(inner1, serializableBefore = false, serializableAfter = false)
257+
258+
// This closure is no longer serializable because it now has a pointer to the outer closure,
259+
// which is itself not serializable because it has a pointer to the ClosureCleanerSuite.
260+
// If we do not clean transitively, we will not null out this parent pointer.
261+
testClean(inner2, serializableBefore = false, serializableAfter = false, transitive = false)
262+
263+
// If we clean transitively, we will find that method `a` does not actually reference the
264+
// outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out
265+
// the outer closure's parent pointer. This will make `inner2` serializable.
266+
testClean(inner2, serializableBefore = false, serializableAfter = true, transitive = true)
267+
}
268+
269+
test1()
270+
test2()
271+
}
272+
273+
274+
275+
276+
277+
// TODO: REMOVE ME
278+
configureLog4j()
279+
private def configureLog4j(): Unit = {
280+
val pro = new Properties()
281+
pro.put("log4j.rootLogger", "WARN, console")
282+
pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
283+
pro.put("log4j.appender.console.target", "System.err")
284+
pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
285+
pro.put("log4j.appender.console.layout.ConversionPattern",
286+
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
287+
pro.put("log4j.logger.org.apache.spark.util.ClosureCleaner", "DEBUG")
288+
PropertyConfigurator.configure(pro)
289+
}
290+
291+
}

0 commit comments

Comments
 (0)