Skip to content

Commit 328139b

Browse files
author
Andrew Or
committed
Do not forget foreachRDD
1 parent 5431f61 commit 328139b

File tree

1 file changed

+2
-1
lines changed
  • streaming/src/main/scala/org/apache/spark/streaming/dstream

1 file changed

+2
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,8 @@ abstract class DStream[T: ClassTag] (
624624
* 'this' DStream will be registered as an output stream and therefore materialized.
625625
*/
626626
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
627-
this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
627+
val cleanedF = context.sparkContext.clean(foreachFunc)
628+
this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
628629
}
629630

630631
/**

0 commit comments

Comments
 (0)