@@ -539,8 +539,7 @@ def test_sequencefiles(self):
539
539
540
540
ed = [(1.0 , u'aa' ), (1.0 , u'aa' ), (2.0 , u'aa' ), (2.0 , u'bb' ), (2.0 , u'bb' ), (3.0 , u'cc' )]
541
541
self .sc .parallelize (ed ).saveAsSequenceFile (basepath + "/sfdouble/" )
542
- doubles = sorted (
543
- self .sc .sequenceFile (basepath + "/sfdouble/" ).collect ())
542
+ doubles = sorted (self .sc .sequenceFile (basepath + "/sfdouble/" ).collect ())
544
543
self .assertEqual (doubles , ed )
545
544
546
545
ebs = [(1 , bytearray (b'\x00 \x07 spam\x08 ' )), (2 , bytearray (b'\x00 \x07 spam\x08 ' ))]
@@ -725,25 +724,25 @@ def test_unbatched_save_and_read(self):
725
724
self .sc .parallelize (ei , numSlices = len (ei )).saveAsSequenceFile (
726
725
basepath + "/unbatched/" )
727
726
728
- unbatched_sequence = sorted (self .sc .sequenceFile (basepath + "/unbatched/" ,
729
- batchSize = 1 ).collect ())
727
+ unbatched_sequence = sorted (self .sc .sequenceFile (
728
+ basepath + "/unbatched/" ,
729
+ batchSize = 1 ).collect ())
730
730
self .assertEqual (unbatched_sequence , ei )
731
731
732
- unbatched_hadoopFile = sorted (
733
- self . sc . hadoopFile ( basepath + "/unbatched/" ,
734
- "org.apache.hadoop.mapred.SequenceFileInputFormat" ,
735
- "org.apache.hadoop.io.IntWritable" ,
736
- "org.apache.hadoop.io.Text" ,
737
- batchSize = 1 ).collect ())
732
+ unbatched_hadoopFile = sorted (self . sc . hadoopFile (
733
+ basepath + "/unbatched/" ,
734
+ "org.apache.hadoop.mapred.SequenceFileInputFormat" ,
735
+ "org.apache.hadoop.io.IntWritable" ,
736
+ "org.apache.hadoop.io.Text" ,
737
+ batchSize = 1 ).collect ())
738
738
self .assertEqual (unbatched_hadoopFile , ei )
739
739
740
- unbatched_newAPIHadoopFile = sorted (
741
- self .sc .newAPIHadoopFile (
742
- basepath + "/unbatched/" ,
743
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" ,
744
- "org.apache.hadoop.io.IntWritable" ,
745
- "org.apache.hadoop.io.Text" ,
746
- batchSize = 1 ).collect ())
740
+ unbatched_newAPIHadoopFile = sorted (self .sc .newAPIHadoopFile (
741
+ basepath + "/unbatched/" ,
742
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" ,
743
+ "org.apache.hadoop.io.IntWritable" ,
744
+ "org.apache.hadoop.io.Text" ,
745
+ batchSize = 1 ).collect ())
747
746
self .assertEqual (unbatched_newAPIHadoopFile , ei )
748
747
749
748
oldconf = {"mapred.input.dir" : basepath + "/unbatched/" }
@@ -949,9 +948,8 @@ def test_module_dependency(self):
949
948
|def myfunc(x):
950
949
| return x + 1
951
950
""" )
952
- proc = subprocess .Popen (
953
- [self .sparkSubmit , "--py-files" , zip , script ],
954
- stdout = subprocess .PIPE )
951
+ proc = subprocess .Popen ([self .sparkSubmit , "--py-files" , zip , script ],
952
+ stdout = subprocess .PIPE )
955
953
out , err = proc .communicate ()
956
954
self .assertEqual (0 , proc .returncode )
957
955
self .assertIn ("[2, 3, 4]" , out )
@@ -969,10 +967,9 @@ def test_module_dependency_on_cluster(self):
969
967
|def myfunc(x):
970
968
| return x + 1
971
969
""" )
972
- proc = subprocess .Popen (
973
- [self .sparkSubmit , "--py-files" , zip , "--master" ,
974
- "local-cluster[1,1,512]" , script ],
975
- stdout = subprocess .PIPE )
970
+ proc = subprocess .Popen ([self .sparkSubmit , "--py-files" , zip , "--master" ,
971
+ "local-cluster[1,1,512]" , script ],
972
+ stdout = subprocess .PIPE )
976
973
out , err = proc .communicate ()
977
974
self .assertEqual (0 , proc .returncode )
978
975
self .assertIn ("[2, 3, 4]" , out )
0 commit comments