Skip to content

Commit 2a06cdb

Browse files
committed
remove waste duplicated code
1 parent c5ecfc1 commit 2a06cdb

File tree

1 file changed

+2
-4
lines changed

1 file changed

+2
-4
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
from collections import defaultdict
1919
from itertools import chain, ifilter, imap
20-
import time
2120
import operator
2221

2322
from pyspark.serializers import NoOpSerializer,\
@@ -246,8 +245,6 @@ def takeAndPrint(rdd, time):
246245
taken = rdd.take(11)
247246
print "-------------------------------------------"
248247
print "Time: %s" % (str(time))
249-
print rdd.glom().collect()
250-
print "-------------------------------------------"
251248
print "-------------------------------------------"
252249
for record in taken[:10]:
253250
print record
@@ -447,6 +444,7 @@ def pipeline_func(split, iterator):
447444
self._prev_jdstream = prev._prev_jdstream # maintain the pipeline
448445
self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
449446
self.is_cached = False
447+
self.is_checkpointed = False
450448
self._ssc = prev._ssc
451449
self.ctx = prev.ctx
452450
self.prev = prev
@@ -483,4 +481,4 @@ def _jdstream(self):
483481
return self._jdstream_val
484482

485483
def _is_pipelinable(self):
486-
return not self.is_cached
484+
return not (self.is_cached or self.is_checkpointed)

0 commit comments

Comments
 (0)