Skip to content

Commit 7ccc74b

Browse files
committed
Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more
1 parent c2fb430 commit 7ccc74b

File tree

2 files changed

+83
-61
lines changed

2 files changed

+83
-61
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.storage
18+
19+
import org.scalatest.FunSuite
20+
import org.apache.spark.{SharedSparkContext, SparkConf, LocalSparkContext, SparkContext}
21+
22+
23+
class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
24+
/* Tests the ability of Spark to deal with user provided iterators from flatMap
25+
* calls, that may generate more data then available memory. In any
26+
* memory based persistance Spark will unroll the iterator into an ArrayBuffer
27+
* for caching, however in the case that the use defines DISK_ONLY persistance,
28+
* the iterator will be fed directly to the serializer and written to disk.
29+
*
30+
* This also tests the ObjectOutputStream reset rate. When serializing using the
31+
* Java serialization system, the serializer caches objects to prevent writing redundant
32+
* data, however that stops GC of those objects. By calling 'reset' you flush that
33+
* info from the serializer, and allow old objects to be GC'd
34+
*/
35+
test("Flatmap Iterator to Disk") {
36+
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
37+
.setAppName("iterator_to_disk_test")
38+
sc = new SparkContext(sconf)
39+
try {
40+
val expand_size = 100
41+
val data = sc.parallelize( (1 to 5).toSeq ).
42+
flatMap( x => Stream.range(0, expand_size) )
43+
var persisted = data.persist(StorageLevel.DISK_ONLY)
44+
println(persisted.count())
45+
assert( persisted.count() == 500)
46+
assert( persisted.filter( _==1 ).count() == 5 )
47+
} catch {
48+
case _ : OutOfMemoryError => assert(false)
49+
}
50+
}
51+
52+
test("Flatmap Iterator to Memory") {
53+
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
54+
.setAppName("iterator_to_disk_test")
55+
sc = new SparkContext(sconf)
56+
try {
57+
val expand_size = 100
58+
val data = sc.parallelize( (1 to 5).toSeq ).
59+
flatMap( x => Stream.range(0, expand_size) )
60+
var persisted = data.persist(StorageLevel.MEMORY_ONLY)
61+
println(persisted.count())
62+
assert( persisted.count() == 500)
63+
assert( persisted.filter( _==1 ).count() == 5 )
64+
} catch {
65+
case _ : OutOfMemoryError => assert(false)
66+
}
67+
}
68+
69+
test("Serializer Reset") {
70+
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
71+
.setAppName("serializer_reset_test")
72+
.set("spark.serializer.objectStreamReset", "10")
73+
74+
sc = new SparkContext(sconf)
75+
val expand_size = 500
76+
val data = sc.parallelize( Seq(1,2) ).
77+
flatMap( x => Stream.range(1, expand_size).
78+
map( y => "%d: string test %d".format(y,x) ) )
79+
var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
80+
assert( persisted.filter( _.startsWith("1:") ).count() == 2 )
81+
}
82+
83+
}

core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)