File tree Expand file tree Collapse file tree 1 file changed +2
-4
lines changed Expand file tree Collapse file tree 1 file changed +2
-4
lines changed Original file line number Diff line number Diff line change 17
17
18
18
from collections import defaultdict
19
19
from itertools import chain , ifilter , imap
20
- import time
21
20
import operator
22
21
23
22
from pyspark .serializers import NoOpSerializer ,\
@@ -246,8 +245,6 @@ def takeAndPrint(rdd, time):
246
245
taken = rdd .take (11 )
247
246
print "-------------------------------------------"
248
247
print "Time: %s" % (str (time ))
249
- print rdd .glom ().collect ()
250
- print "-------------------------------------------"
251
248
print "-------------------------------------------"
252
249
for record in taken [:10 ]:
253
250
print record
@@ -447,6 +444,7 @@ def pipeline_func(split, iterator):
447
444
self ._prev_jdstream = prev ._prev_jdstream # maintain the pipeline
448
445
self ._prev_jrdd_deserializer = prev ._prev_jrdd_deserializer
449
446
self .is_cached = False
447
+ self .is_checkpointed = False
450
448
self ._ssc = prev ._ssc
451
449
self .ctx = prev .ctx
452
450
self .prev = prev
@@ -483,4 +481,4 @@ def _jdstream(self):
483
481
return self ._jdstream_val
484
482
485
483
def _is_pipelinable (self ):
486
- return not self .is_cached
484
+ return not ( self .is_cached or self . is_checkpointed )
You can’t perform that action at this time.
0 commit comments