Skip to content

Commit 7a88f9f

Browse files
committed
rollback RDD.setContext(), use textFileStream() to test checkpointing
1 parent bd8a4c2 commit 7a88f9f

File tree

4 files changed

+28
-41
lines changed

4 files changed

+28
-41
lines changed

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
8484

8585
private[spark] class ParallelCollectionRDD[T: ClassTag](
8686
@transient sc: SparkContext,
87-
data: Seq[T],
87+
@transient data: Seq[T],
8888
numSlices: Int,
8989
locationPrefs: Map[Int, Seq[String]])
9090
extends RDD[T](sc, Nil) {

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,6 @@ abstract class RDD[T: ClassTag](
8282
def this(@transient oneParent: RDD[_]) =
8383
this(oneParent.context , List(new OneToOneDependency(oneParent)))
8484

85-
// setContext after loading from checkpointing
86-
private[spark] def setContext(s: SparkContext) = {
87-
if (sc != null && sc != s) {
88-
throw new SparkException("Context is already set in " + this + ", cannot set it again")
89-
}
90-
sc = s
91-
}
92-
9385
private[spark] def conf = sc.conf
9486
// =======================================================================
9587
// Methods that should be implemented by subclasses of RDD

python/pyspark/streaming/tests.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ def _collect(self, dstream):
7070

7171
def get_output(_, rdd):
7272
r = rdd.collect()
73-
result.append(r)
73+
if r:
74+
result.append(r)
7475
dstream.foreachRDD(get_output)
7576
return result
7677

@@ -449,24 +450,18 @@ def test_queueStream(self):
449450
time.sleep(1)
450451
self.assertEqual(input, result[:3])
451452

452-
# TODO: fix this test
453-
# def test_textFileStream(self):
454-
# input = [range(i) for i in range(3)]
455-
# dstream = self.ssc.queueStream(input)
456-
# d = os.path.join(tempfile.gettempdir(), str(id(self)))
457-
# if not os.path.exists(d):
458-
# os.makedirs(d)
459-
# dstream.saveAsTextFiles(os.path.join(d, 'test'))
460-
# self.ssc.start()
461-
# time.sleep(1)
462-
# self.ssc.stop(False, True)
463-
#
464-
# self.ssc = StreamingContext(self.sc, self.batachDuration)
465-
# dstream2 = self.ssc.textFileStream(d)
466-
# result = self._collect(dstream2)
467-
# self.ssc.start()
468-
# time.sleep(2)
469-
# self.assertEqual(input, result[:3])
453+
def test_textFileStream(self):
454+
d = tempfile.mkdtemp()
455+
self.ssc = StreamingContext(self.sc, self.duration)
456+
dstream2 = self.ssc.textFileStream(d).map(int)
457+
result = self._collect(dstream2)
458+
self.ssc.start()
459+
time.sleep(1)
460+
for name in ('a', 'b'):
461+
with open(os.path.join(d, name), "w") as f:
462+
f.writelines(["%d\n" % i for i in range(10)])
463+
time.sleep(2)
464+
self.assertEqual([range(10) * 2], result[:3])
470465

471466
def test_union(self):
472467
input = [range(i) for i in range(3)]
@@ -503,27 +498,34 @@ def tearDown(self):
503498

504499
def test_get_or_create(self):
505500
result = [0]
501+
inputd = tempfile.mkdtemp()
506502

507503
def setup():
508504
conf = SparkConf().set("spark.default.parallelism", 1)
509505
sc = SparkContext(conf=conf)
510506
ssc = StreamingContext(sc, .2)
511-
rdd = sc.parallelize(range(1), 1)
512-
dstream = ssc.queueStream([rdd], default=rdd)
513-
result[0] = self._collect(dstream.countByWindow(1, 0.2))
507+
dstream = ssc.textFileStream(inputd)
508+
result[0] = self._collect(dstream.count())
514509
return ssc
510+
515511
tmpd = tempfile.mkdtemp("test_streaming_cps")
516512
ssc = StreamingContext.getOrCreate(tmpd, setup)
517513
ssc.start()
514+
time.sleep(1)
515+
with open(os.path.join(inputd, "1"), 'w') as f:
516+
f.writelines(["%d\n" % i for i in range(10)])
518517
ssc.awaitTermination(4)
519-
ssc.stop()
518+
ssc.stop(True, True)
520519
expected = [[i * 1 + 1] for i in range(5)] + [[5]] * 5
521-
self.assertEqual(expected, result[0][:10])
520+
self.assertEqual([[10]], result[0][:1])
522521

523522
ssc = StreamingContext.getOrCreate(tmpd, setup)
524523
ssc.start()
524+
time.sleep(1)
525+
with open(os.path.join(inputd, "1"), 'w') as f:
526+
f.writelines(["%d\n" % i for i in range(10)])
525527
ssc.awaitTermination(2)
526-
ssc.stop()
528+
ssc.stop(True, True)
527529

528530

529531
if __name__ == "__main__":

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.SparkException
2120
import org.apache.spark.rdd.RDD
2221
import org.apache.spark.rdd.UnionRDD
2322
import scala.collection.mutable.Queue
@@ -33,12 +32,6 @@ class QueueInputDStream[T: ClassTag](
3332
defaultRDD: RDD[T]
3433
) extends InputDStream[T](ssc) {
3534

36-
private[streaming] override def setContext(s: StreamingContext) {
37-
super.setContext(s)
38-
queue.map(_.setContext(s.sparkContext))
39-
defaultRDD.setContext(s.sparkContext)
40-
}
41-
4235
override def start() { }
4336

4437
override def stop() { }

0 commit comments

Comments
 (0)