@@ -59,8 +59,7 @@ class SparkContext(object):
59
59
_writeToFile = None
60
60
_next_accum_id = 0
61
61
_active_spark_context = None
62
- _lock = Lock ()
63
- # zip and egg files that need to be added to PYTHONPATH
62
+ _lock = Lock () # zip and egg files that need to be added to PYTHONPATH
64
63
_python_includes = None
65
64
_default_batch_size_for_serialized_input = 10
66
65
@@ -101,15 +100,13 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
101
100
self ._callsite = rdd ._extract_concise_traceback ()
102
101
else :
103
102
tempNamedTuple = namedtuple ("Callsite" , "function file linenum" )
104
- self ._callsite = tempNamedTuple (
105
- function = None , file = None , linenum = None )
103
+ self ._callsite = tempNamedTuple (function = None , file = None , linenum = None )
106
104
SparkContext ._ensure_initialized (self , gateway = gateway )
107
105
try :
108
106
self ._do_init (master , appName , sparkHome , pyFiles , environment , batchSize , serializer ,
109
107
conf )
110
108
except :
111
- # If an error occurs, clean up in order to allow future
112
- # SparkContext creation:
109
+ # If an error occurs, clean up in order to allow future SparkContext creation:
113
110
self .stop ()
114
111
raise
115
112
@@ -142,8 +139,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
142
139
if not self ._conf .contains ("spark.master" ):
143
140
raise Exception ("A master URL must be set in your configuration" )
144
141
if not self ._conf .contains ("spark.app.name" ):
145
- raise Exception (
146
- "An application name must be set in your configuration" )
142
+ raise Exception ("An application name must be set in your configuration" )
147
143
148
144
# Read back our properties from the conf in case we loaded some of them from
149
145
# the classpath or an external config file
@@ -184,8 +180,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
184
180
self .addPyFile (path )
185
181
186
182
# Deploy code dependencies set by spark-submit; these will already have been added
187
- # with SparkContext.addFile, so we just need to add them to the
188
- # PYTHONPATH
183
+ # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
189
184
for path in self ._conf .get ("spark.submit.pyFiles" , "" ).split ("," ):
190
185
if path != "" :
191
186
(dirname , filename ) = os .path .split (path )
@@ -195,11 +190,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
195
190
sys .path .append (dirname )
196
191
197
192
# Create a temporary directory inside spark.local.dir:
198
- local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (
199
- self ._jsc .sc ().conf ())
193
+ local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (self ._jsc .sc ().conf ())
200
194
self ._temp_dir = \
201
- self ._jvm .org .apache .spark .util .Utils .createTempDir (
202
- local_dir ).getAbsolutePath ()
195
+ self ._jvm .org .apache .spark .util .Utils .createTempDir (local_dir ).getAbsolutePath ()
203
196
204
197
def _initialize_context (self , jconf ):
205
198
"""
@@ -292,8 +285,7 @@ def parallelize(self, c, numSlices=None):
292
285
# because it sends O(n) Py4J commands. As an alternative, serialized
293
286
# objects are written to a file and loaded through textFile().
294
287
tempFile = NamedTemporaryFile (delete = False , dir = self ._temp_dir )
295
- # Make sure we distribute data evenly if it's smaller than
296
- # self.batchSize
288
+ # Make sure we distribute data evenly if it's smaller than self.batchSize
297
289
if "__len__" not in dir (c ):
298
290
c = list (c ) # Make it a list so we can compute its length
299
291
batchSize = min (len (c ) // numSlices , self ._batchSize )
@@ -412,10 +404,8 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
412
404
Java object. (default sc._default_batch_size_for_serialized_input)
413
405
"""
414
406
minSplits = minSplits or min (self .defaultParallelism , 2 )
415
- batchSize = max (
416
- 1 , batchSize or self ._default_batch_size_for_serialized_input )
417
- ser = BatchedSerializer (PickleSerializer ()) if (
418
- batchSize > 1 ) else PickleSerializer ()
407
+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
408
+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
419
409
jrdd = self ._jvm .PythonRDD .sequenceFile (self ._jsc , path , keyClass , valueClass ,
420
410
keyConverter , valueConverter , minSplits , batchSize )
421
411
return RDD (jrdd , self , ser )
@@ -445,13 +435,11 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
445
435
Java object. (default sc._default_batch_size_for_serialized_input)
446
436
"""
447
437
jconf = self ._dictToJavaMap (conf )
448
- batchSize = max (
449
- 1 , batchSize or self ._default_batch_size_for_serialized_input )
450
- ser = BatchedSerializer (PickleSerializer ()) if (
451
- batchSize > 1 ) else PickleSerializer ()
452
- jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (
453
- self ._jsc , path , inputFormatClass , keyClass ,
454
- valueClass , keyConverter , valueConverter , jconf , batchSize )
438
+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
439
+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
440
+ jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
441
+ valueClass , keyConverter , valueConverter ,
442
+ jconf , batchSize )
455
443
return RDD (jrdd , self , ser )
456
444
457
445
def newAPIHadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -476,13 +464,11 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
476
464
Java object. (default sc._default_batch_size_for_serialized_input)
477
465
"""
478
466
jconf = self ._dictToJavaMap (conf )
479
- batchSize = max (
480
- 1 , batchSize or self ._default_batch_size_for_serialized_input )
481
- ser = BatchedSerializer (PickleSerializer ()) if (
482
- batchSize > 1 ) else PickleSerializer ()
483
- jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (
484
- self ._jsc , inputFormatClass , keyClass ,
485
- valueClass , keyConverter , valueConverter , jconf , batchSize )
467
+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
468
+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
469
+ jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (self ._jsc , inputFormatClass , keyClass ,
470
+ valueClass , keyConverter , valueConverter ,
471
+ jconf , batchSize )
486
472
return RDD (jrdd , self , ser )
487
473
488
474
def hadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -510,13 +496,11 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
510
496
Java object. (default sc._default_batch_size_for_serialized_input)
511
497
"""
512
498
jconf = self ._dictToJavaMap (conf )
513
- batchSize = max (
514
- 1 , batchSize or self ._default_batch_size_for_serialized_input )
515
- ser = BatchedSerializer (PickleSerializer ()) if (
516
- batchSize > 1 ) else PickleSerializer ()
517
- jrdd = self ._jvm .PythonRDD .hadoopFile (
518
- self ._jsc , path , inputFormatClass , keyClass ,
519
- valueClass , keyConverter , valueConverter , jconf , batchSize )
499
+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
500
+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
501
+ jrdd = self ._jvm .PythonRDD .hadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
502
+ valueClass , keyConverter , valueConverter ,
503
+ jconf , batchSize )
520
504
return RDD (jrdd , self , ser )
521
505
522
506
def hadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -541,12 +525,11 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
541
525
Java object. (default sc._default_batch_size_for_serialized_input)
542
526
"""
543
527
jconf = self ._dictToJavaMap (conf )
544
- batchSize = max (
545
- 1 , batchSize or self ._default_batch_size_for_serialized_input )
546
- ser = BatchedSerializer (PickleSerializer ()) if (
547
- batchSize > 1 ) else PickleSerializer ()
548
- jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass , valueClass ,
549
- keyConverter , valueConverter , jconf , batchSize )
528
+ batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
529
+ ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
530
+ jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass ,
531
+ valueClass , keyConverter , valueConverter ,
532
+ jconf , batchSize )
550
533
return RDD (jrdd , self , ser )
551
534
552
535
def _checkpointFile (self , name , input_deserializer ):
@@ -577,8 +560,7 @@ def union(self, rdds):
577
560
first = rdds [0 ]._jrdd
578
561
rest = [x ._jrdd for x in rdds [1 :]]
579
562
rest = ListConverter ().convert (rest , self ._gateway ._gateway_client )
580
- return RDD (self ._jsc .union (first , rest ), self ,
581
- rdds [0 ]._jrdd_deserializer )
563
+ return RDD (self ._jsc .union (first , rest ), self , rdds [0 ]._jrdd_deserializer )
582
564
583
565
def broadcast (self , value ):
584
566
"""
@@ -590,8 +572,7 @@ def broadcast(self, value):
590
572
pickleSer = PickleSerializer ()
591
573
pickled = pickleSer .dumps (value )
592
574
jbroadcast = self ._jsc .broadcast (bytearray (pickled ))
593
- return Broadcast (jbroadcast .id (), value , jbroadcast ,
594
- self ._pickled_broadcast_vars )
575
+ return Broadcast (jbroadcast .id (), value , jbroadcast , self ._pickled_broadcast_vars )
595
576
596
577
def accumulator (self , value , accum_param = None ):
597
578
"""
@@ -609,8 +590,7 @@ def accumulator(self, value, accum_param=None):
609
590
elif isinstance (value , complex ):
610
591
accum_param = accumulators .COMPLEX_ACCUMULATOR_PARAM
611
592
else :
612
- raise Exception (
613
- "No default accumulator param for type %s" % type (value ))
593
+ raise Exception ("No default accumulator param for type %s" % type (value ))
614
594
SparkContext ._next_accum_id += 1
615
595
return Accumulator (SparkContext ._next_accum_id - 1 , value , accum_param )
616
596
@@ -655,14 +635,12 @@ def addPyFile(self, path):
655
635
HTTP, HTTPS or FTP URI.
656
636
"""
657
637
self .addFile (path )
658
- # dirname may be directory or HDFS/S3 prefix
659
- (dirname , filename ) = os .path .split (path )
638
+ (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
660
639
661
640
if filename .endswith ('.zip' ) or filename .endswith ('.ZIP' ) or filename .endswith ('.egg' ):
662
641
self ._python_includes .append (filename )
663
642
# for tests in local mode
664
- sys .path .append (
665
- os .path .join (SparkFiles .getRootDirectory (), filename ))
643
+ sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename ))
666
644
667
645
def setCheckpointDir (self , dirName ):
668
646
"""
@@ -676,8 +654,7 @@ def _getJavaStorageLevel(self, storageLevel):
676
654
Returns a Java StorageLevel based on a pyspark.StorageLevel.
677
655
"""
678
656
if not isinstance (storageLevel , StorageLevel ):
679
- raise Exception (
680
- "storageLevel must be of type pyspark.StorageLevel" )
657
+ raise Exception ("storageLevel must be of type pyspark.StorageLevel" )
681
658
682
659
newStorageLevel = self ._jvm .org .apache .spark .storage .StorageLevel
683
660
return newStorageLevel (storageLevel .useDisk ,
@@ -780,15 +757,13 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
780
757
"""
781
758
if partitions is None :
782
759
partitions = range (rdd ._jrdd .partitions ().size ())
783
- javaPartitions = ListConverter ().convert (
784
- partitions , self ._gateway ._gateway_client )
760
+ javaPartitions = ListConverter ().convert (partitions , self ._gateway ._gateway_client )
785
761
786
762
# Implementation note: This is implemented as a mapPartitions followed
787
763
# by runJob() in order to avoid having to pass a Python lambda into
788
764
# SparkContext#runJob.
789
765
mappedRDD = rdd .mapPartitions (partitionFunc )
790
- it = self ._jvm .PythonRDD .runJob (
791
- self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
766
+ it = self ._jvm .PythonRDD .runJob (self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
792
767
return list (mappedRDD ._collect_iterator_through_file (it ))
793
768
794
769
@@ -800,8 +775,7 @@ def _test():
800
775
globs ['sc' ] = SparkContext ('local[4]' , 'PythonTest' , batchSize = 2 )
801
776
globs ['tempdir' ] = tempfile .mkdtemp ()
802
777
atexit .register (lambda : shutil .rmtree (globs ['tempdir' ]))
803
- (failure_count , test_count ) = doctest .testmod (
804
- globs = globs , optionflags = doctest .ELLIPSIS )
778
+ (failure_count , test_count ) = doctest .testmod (globs = globs , optionflags = doctest .ELLIPSIS )
805
779
globs ['sc' ].stop ()
806
780
if failure_count :
807
781
exit (- 1 )
0 commit comments