47
47
48
48
49
49
class SparkContext (object ):
50
+
50
51
"""
51
52
Main entry point for Spark functionality. A SparkContext represents the
52
53
connection to a Spark cluster, and can be used to create L{RDD}s and
@@ -59,7 +60,8 @@ class SparkContext(object):
59
60
_next_accum_id = 0
60
61
_active_spark_context = None
61
62
_lock = Lock ()
62
- _python_includes = None # zip and egg files that need to be added to PYTHONPATH
63
+ # zip and egg files that need to be added to PYTHONPATH
64
+ _python_includes = None
63
65
_default_batch_size_for_serialized_input = 10
64
66
65
67
def __init__ (self , master = None , appName = None , sparkHome = None , pyFiles = None ,
@@ -99,13 +101,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
99
101
self ._callsite = rdd ._extract_concise_traceback ()
100
102
else :
101
103
tempNamedTuple = namedtuple ("Callsite" , "function file linenum" )
102
- self ._callsite = tempNamedTuple (function = None , file = None , linenum = None )
104
+ self ._callsite = tempNamedTuple (
105
+ function = None , file = None , linenum = None )
103
106
SparkContext ._ensure_initialized (self , gateway = gateway )
104
107
try :
105
108
self ._do_init (master , appName , sparkHome , pyFiles , environment , batchSize , serializer ,
106
109
conf )
107
110
except :
108
- # If an error occurs, clean up in order to allow future SparkContext creation:
111
+ # If an error occurs, clean up in order to allow future
112
+ # SparkContext creation:
109
113
self .stop ()
110
114
raise
111
115
@@ -138,7 +142,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
138
142
if not self ._conf .contains ("spark.master" ):
139
143
raise Exception ("A master URL must be set in your configuration" )
140
144
if not self ._conf .contains ("spark.app.name" ):
141
- raise Exception ("An application name must be set in your configuration" )
145
+ raise Exception (
146
+ "An application name must be set in your configuration" )
142
147
143
148
# Read back our properties from the conf in case we loaded some of them from
144
149
# the classpath or an external config file
@@ -179,7 +184,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
179
184
self .addPyFile (path )
180
185
181
186
# Deploy code dependencies set by spark-submit; these will already have been added
182
- # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
187
+ # with SparkContext.addFile, so we just need to add them to the
188
+ # PYTHONPATH
183
189
for path in self ._conf .get ("spark.submit.pyFiles" , "" ).split ("," ):
184
190
if path != "" :
185
191
(dirname , filename ) = os .path .split (path )
@@ -189,9 +195,11 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
189
195
sys .path .append (dirname )
190
196
191
197
# Create a temporary directory inside spark.local.dir:
192
- local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (self ._jsc .sc ().conf ())
198
+ local_dir = self ._jvm .org .apache .spark .util .Utils .getLocalDir (
199
+ self ._jsc .sc ().conf ())
193
200
self ._temp_dir = \
194
- self ._jvm .org .apache .spark .util .Utils .createTempDir (local_dir ).getAbsolutePath ()
201
+ self ._jvm .org .apache .spark .util .Utils .createTempDir (
202
+ local_dir ).getAbsolutePath ()
195
203
196
204
def _initialize_context (self , jconf ):
197
205
"""
@@ -213,7 +221,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
213
221
214
222
if instance :
215
223
if (SparkContext ._active_spark_context and
216
- SparkContext ._active_spark_context != instance ):
224
+ SparkContext ._active_spark_context != instance ):
217
225
currentMaster = SparkContext ._active_spark_context .master
218
226
currentAppName = SparkContext ._active_spark_context .appName
219
227
callsite = SparkContext ._active_spark_context ._callsite
@@ -284,7 +292,8 @@ def parallelize(self, c, numSlices=None):
284
292
# because it sends O(n) Py4J commands. As an alternative, serialized
285
293
# objects are written to a file and loaded through textFile().
286
294
tempFile = NamedTemporaryFile (delete = False , dir = self ._temp_dir )
287
- # Make sure we distribute data evenly if it's smaller than self.batchSize
295
+ # Make sure we distribute data evenly if it's smaller than
296
+ # self.batchSize
288
297
if "__len__" not in dir (c ):
289
298
c = list (c ) # Make it a list so we can compute its length
290
299
batchSize = min (len (c ) // numSlices , self ._batchSize )
@@ -403,10 +412,12 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
403
412
Java object. (default sc._default_batch_size_for_serialized_input)
404
413
"""
405
414
minSplits = minSplits or min (self .defaultParallelism , 2 )
406
- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
407
- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
415
+ batchSize = max (
416
+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
417
+ ser = BatchedSerializer (PickleSerializer ()) if (
418
+ batchSize > 1 ) else PickleSerializer ()
408
419
jrdd = self ._jvm .PythonRDD .sequenceFile (self ._jsc , path , keyClass , valueClass ,
409
- keyConverter , valueConverter , minSplits , batchSize )
420
+ keyConverter , valueConverter , minSplits , batchSize )
410
421
return RDD (jrdd , self , ser )
411
422
412
423
def newAPIHadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -434,10 +445,13 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
434
445
Java object. (default sc._default_batch_size_for_serialized_input)
435
446
"""
436
447
jconf = self ._dictToJavaMap (conf )
437
- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
438
- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
439
- jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
440
- valueClass , keyConverter , valueConverter , jconf , batchSize )
448
+ batchSize = max (
449
+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
450
+ ser = BatchedSerializer (PickleSerializer ()) if (
451
+ batchSize > 1 ) else PickleSerializer ()
452
+ jrdd = self ._jvm .PythonRDD .newAPIHadoopFile (
453
+ self ._jsc , path , inputFormatClass , keyClass ,
454
+ valueClass , keyConverter , valueConverter , jconf , batchSize )
441
455
return RDD (jrdd , self , ser )
442
456
443
457
def newAPIHadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -462,10 +476,13 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
462
476
Java object. (default sc._default_batch_size_for_serialized_input)
463
477
"""
464
478
jconf = self ._dictToJavaMap (conf )
465
- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
466
- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
467
- jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (self ._jsc , inputFormatClass , keyClass ,
468
- valueClass , keyConverter , valueConverter , jconf , batchSize )
479
+ batchSize = max (
480
+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
481
+ ser = BatchedSerializer (PickleSerializer ()) if (
482
+ batchSize > 1 ) else PickleSerializer ()
483
+ jrdd = self ._jvm .PythonRDD .newAPIHadoopRDD (
484
+ self ._jsc , inputFormatClass , keyClass ,
485
+ valueClass , keyConverter , valueConverter , jconf , batchSize )
469
486
return RDD (jrdd , self , ser )
470
487
471
488
def hadoopFile (self , path , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -493,10 +510,13 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
493
510
Java object. (default sc._default_batch_size_for_serialized_input)
494
511
"""
495
512
jconf = self ._dictToJavaMap (conf )
496
- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
497
- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
498
- jrdd = self ._jvm .PythonRDD .hadoopFile (self ._jsc , path , inputFormatClass , keyClass ,
499
- valueClass , keyConverter , valueConverter , jconf , batchSize )
513
+ batchSize = max (
514
+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
515
+ ser = BatchedSerializer (PickleSerializer ()) if (
516
+ batchSize > 1 ) else PickleSerializer ()
517
+ jrdd = self ._jvm .PythonRDD .hadoopFile (
518
+ self ._jsc , path , inputFormatClass , keyClass ,
519
+ valueClass , keyConverter , valueConverter , jconf , batchSize )
500
520
return RDD (jrdd , self , ser )
501
521
502
522
def hadoopRDD (self , inputFormatClass , keyClass , valueClass , keyConverter = None ,
@@ -521,10 +541,12 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
521
541
Java object. (default sc._default_batch_size_for_serialized_input)
522
542
"""
523
543
jconf = self ._dictToJavaMap (conf )
524
- batchSize = max (1 , batchSize or self ._default_batch_size_for_serialized_input )
525
- ser = BatchedSerializer (PickleSerializer ()) if (batchSize > 1 ) else PickleSerializer ()
544
+ batchSize = max (
545
+ 1 , batchSize or self ._default_batch_size_for_serialized_input )
546
+ ser = BatchedSerializer (PickleSerializer ()) if (
547
+ batchSize > 1 ) else PickleSerializer ()
526
548
jrdd = self ._jvm .PythonRDD .hadoopRDD (self ._jsc , inputFormatClass , keyClass , valueClass ,
527
- keyConverter , valueConverter , jconf , batchSize )
549
+ keyConverter , valueConverter , jconf , batchSize )
528
550
return RDD (jrdd , self , ser )
529
551
530
552
def _checkpointFile (self , name , input_deserializer ):
@@ -587,7 +609,8 @@ def accumulator(self, value, accum_param=None):
587
609
elif isinstance (value , complex ):
588
610
accum_param = accumulators .COMPLEX_ACCUMULATOR_PARAM
589
611
else :
590
- raise Exception ("No default accumulator param for type %s" % type (value ))
612
+ raise Exception (
613
+ "No default accumulator param for type %s" % type (value ))
591
614
SparkContext ._next_accum_id += 1
592
615
return Accumulator (SparkContext ._next_accum_id - 1 , value , accum_param )
593
616
@@ -632,12 +655,14 @@ def addPyFile(self, path):
632
655
HTTP, HTTPS or FTP URI.
633
656
"""
634
657
self .addFile (path )
635
- (dirname , filename ) = os .path .split (path ) # dirname may be directory or HDFS/S3 prefix
658
+ # dirname may be directory or HDFS/S3 prefix
659
+ (dirname , filename ) = os .path .split (path )
636
660
637
661
if filename .endswith ('.zip' ) or filename .endswith ('.ZIP' ) or filename .endswith ('.egg' ):
638
662
self ._python_includes .append (filename )
639
663
# for tests in local mode
640
- sys .path .append (os .path .join (SparkFiles .getRootDirectory (), filename ))
664
+ sys .path .append (
665
+ os .path .join (SparkFiles .getRootDirectory (), filename ))
641
666
642
667
def setCheckpointDir (self , dirName ):
643
668
"""
@@ -651,7 +676,8 @@ def _getJavaStorageLevel(self, storageLevel):
651
676
Returns a Java StorageLevel based on a pyspark.StorageLevel.
652
677
"""
653
678
if not isinstance (storageLevel , StorageLevel ):
654
- raise Exception ("storageLevel must be of type pyspark.StorageLevel" )
679
+ raise Exception (
680
+ "storageLevel must be of type pyspark.StorageLevel" )
655
681
656
682
newStorageLevel = self ._jvm .org .apache .spark .storage .StorageLevel
657
683
return newStorageLevel (storageLevel .useDisk ,
@@ -754,13 +780,15 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
754
780
"""
755
781
if partitions is None :
756
782
partitions = range (rdd ._jrdd .partitions ().size ())
757
- javaPartitions = ListConverter ().convert (partitions , self ._gateway ._gateway_client )
783
+ javaPartitions = ListConverter ().convert (
784
+ partitions , self ._gateway ._gateway_client )
758
785
759
786
# Implementation note: This is implemented as a mapPartitions followed
760
787
# by runJob() in order to avoid having to pass a Python lambda into
761
788
# SparkContext#runJob.
762
789
mappedRDD = rdd .mapPartitions (partitionFunc )
763
- it = self ._jvm .PythonRDD .runJob (self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
790
+ it = self ._jvm .PythonRDD .runJob (
791
+ self ._jsc .sc (), mappedRDD ._jrdd , javaPartitions , allowLocal )
764
792
return list (mappedRDD ._collect_iterator_through_file (it ))
765
793
766
794
@@ -772,7 +800,8 @@ def _test():
772
800
globs ['sc' ] = SparkContext ('local[4]' , 'PythonTest' , batchSize = 2 )
773
801
globs ['tempdir' ] = tempfile .mkdtemp ()
774
802
atexit .register (lambda : shutil .rmtree (globs ['tempdir' ]))
775
- (failure_count , test_count ) = doctest .testmod (globs = globs , optionflags = doctest .ELLIPSIS )
803
+ (failure_count , test_count ) = doctest .testmod (
804
+ globs = globs , optionflags = doctest .ELLIPSIS )
776
805
globs ['sc' ].stop ()
777
806
if failure_count :
778
807
exit (- 1 )
0 commit comments