@@ -532,6 +532,9 @@ abstract class DStream[T: ClassTag] (
532
532
* 'this' DStream will be registered as an output stream and therefore materialized.
533
533
*/
534
534
def foreachRDD (foreachFunc : (RDD [T ], Time ) => Unit ) {
535
+ // because the DStream is reachable from the outer object here, and because
536
+ // DStreams can't be serialized with closures, we can't proactively check
537
+ // it for serializability and so we pass the optional false to SparkContext.clean
535
538
new ForEachDStream (this , context.sparkContext.clean(foreachFunc, false )).register()
536
539
}
537
540
@@ -540,6 +543,9 @@ abstract class DStream[T: ClassTag] (
540
543
* on each RDD of 'this' DStream.
541
544
*/
542
545
def transform [U : ClassTag ](transformFunc : RDD [T ] => RDD [U ]): DStream [U ] = {
546
+ // because the DStream is reachable from the outer object here, and because
547
+ // DStreams can't be serialized with closures, we can't proactively check
548
+ // it for serializability and so we pass the optional false to SparkContext.clean
543
549
transform((r : RDD [T ], t : Time ) => context.sparkContext.clean(transformFunc(r), false ))
544
550
}
545
551
@@ -548,6 +554,9 @@ abstract class DStream[T: ClassTag] (
548
554
* on each RDD of 'this' DStream.
549
555
*/
550
556
def transform [U : ClassTag ](transformFunc : (RDD [T ], Time ) => RDD [U ]): DStream [U ] = {
557
+ // because the DStream is reachable from the outer object here, and because
558
+ // DStreams can't be serialized with closures, we can't proactively check
559
+ // it for serializability and so we pass the optional false to SparkContext.clean
551
560
val cleanedF = context.sparkContext.clean(transformFunc, false )
552
561
val realTransformFunc = (rdds : Seq [RDD [_]], time : Time ) => {
553
562
assert(rdds.length == 1 )
@@ -563,6 +572,9 @@ abstract class DStream[T: ClassTag] (
563
572
def transformWith [U : ClassTag , V : ClassTag ](
564
573
other : DStream [U ], transformFunc : (RDD [T ], RDD [U ]) => RDD [V ]
565
574
): DStream [V ] = {
575
+ // because the DStream is reachable from the outer object here, and because
576
+ // DStreams can't be serialized with closures, we can't proactively check
577
+ // it for serializability and so we pass the optional false to SparkContext.clean
566
578
val cleanedF = ssc.sparkContext.clean(transformFunc, false )
567
579
transformWith(other, (rdd1 : RDD [T ], rdd2 : RDD [U ], time : Time ) => cleanedF(rdd1, rdd2))
568
580
}
@@ -574,6 +586,9 @@ abstract class DStream[T: ClassTag] (
574
586
def transformWith [U : ClassTag , V : ClassTag ](
575
587
other : DStream [U ], transformFunc : (RDD [T ], RDD [U ], Time ) => RDD [V ]
576
588
): DStream [V ] = {
589
+ // because the DStream is reachable from the outer object here, and because
590
+ // DStreams can't be serialized with closures, we can't proactively check
591
+ // it for serializability and so we pass the optional false to SparkContext.clean
577
592
val cleanedF = ssc.sparkContext.clean(transformFunc, false )
578
593
val realTransformFunc = (rdds : Seq [RDD [_]], time : Time ) => {
579
594
assert(rdds.length == 2 )
0 commit comments