@@ -315,17 +315,14 @@ abstract class RDD[T: ClassTag](
315
315
* Return a new RDD containing the distinct elements in this RDD.
316
316
*/
317
317
@ RDDScope
318
- def distinct (numPartitions : Int )(implicit ord : Ordering [T ] = null ): RDD [T ] = {
318
+ def distinct (numPartitions : Int )(implicit ord : Ordering [T ] = null ): RDD [T ] =
319
319
map(x => (x, null )).reduceByKey((x, y) => x, numPartitions).map(_._1)
320
- }
321
320
322
321
/**
323
322
* Return a new RDD containing the distinct elements in this RDD.
324
323
*/
325
324
@ RDDScope
326
- def distinct (): RDD [T ] = {
327
- distinct(partitions.length)
328
- }
325
+ def distinct (): RDD [T ] = distinct(partitions.length)
329
326
330
327
/**
331
328
* Return a new RDD that has exactly numPartitions partitions.
@@ -362,10 +359,8 @@ abstract class RDD[T: ClassTag](
362
359
* data distributed using a hash partitioner.
363
360
*/
364
361
@ RDDScope
365
- def coalesce (
366
- numPartitions : Int ,
367
- shuffle : Boolean = false )
368
- (implicit ord : Ordering [T ] = null ): RDD [T ] = {
362
+ def coalesce (numPartitions : Int , shuffle : Boolean = false )(implicit ord : Ordering [T ] = null )
363
+ : RDD [T ] = {
369
364
if (shuffle) {
370
365
/** Distributes elements evenly across output partitions, starting from a random partition. */
371
366
val distributePartition = (index : Int , items : Iterator [T ]) => {
@@ -419,9 +414,7 @@ abstract class RDD[T: ClassTag](
419
414
* @return split RDDs in an array
420
415
*/
421
416
@ RDDScope
422
- def randomSplit (
423
- weights : Array [Double ],
424
- seed : Long = Utils .random.nextLong): Array [RDD [T ]] = {
417
+ def randomSplit (weights : Array [Double ], seed : Long = Utils .random.nextLong): Array [RDD [T ]] = {
425
418
val sum = weights.sum
426
419
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d )(_ + _)
427
420
normalizedCumWeights.sliding(2 ).map { x =>
@@ -502,9 +495,7 @@ abstract class RDD[T: ClassTag](
502
495
* times (use `.distinct()` to eliminate them).
503
496
*/
504
497
@ RDDScope
505
- def ++ (other : RDD [T ]): RDD [T ] = {
506
- this .union(other)
507
- }
498
+ def ++ (other : RDD [T ]): RDD [T ] = this .union(other)
508
499
509
500
/**
510
501
* Return this RDD sorted by the given key function.
@@ -514,11 +505,10 @@ abstract class RDD[T: ClassTag](
514
505
f : (T ) => K ,
515
506
ascending : Boolean = true ,
516
507
numPartitions : Int = this .partitions.length)
517
- (implicit ord : Ordering [K ], ctag : ClassTag [K ]): RDD [T ] = {
508
+ (implicit ord : Ordering [K ], ctag : ClassTag [K ]): RDD [T ] =
518
509
this .keyBy[K ](f)
519
- .sortByKey(ascending, numPartitions)
520
- .values
521
- }
510
+ .sortByKey(ascending, numPartitions)
511
+ .values
522
512
523
513
/**
524
514
* Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -529,8 +519,8 @@ abstract class RDD[T: ClassTag](
529
519
@ RDDScope
530
520
def intersection (other : RDD [T ]): RDD [T ] = {
531
521
this .map(v => (v, null )).cogroup(other.map(v => (v, null )))
532
- .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
533
- .keys
522
+ .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
523
+ .keys
534
524
}
535
525
536
526
/**
@@ -542,13 +532,11 @@ abstract class RDD[T: ClassTag](
542
532
* @param partitioner Partitioner to use for the resulting RDD
543
533
*/
544
534
@ RDDScope
545
- def intersection (
546
- other : RDD [T ],
547
- partitioner : Partitioner )
548
- (implicit ord : Ordering [T ] = null ): RDD [T ] = {
535
+ def intersection (other : RDD [T ], partitioner : Partitioner )(implicit ord : Ordering [T ] = null )
536
+ : RDD [T ] = {
549
537
this .map(v => (v, null )).cogroup(other.map(v => (v, null )), partitioner)
550
- .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
551
- .keys
538
+ .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
539
+ .keys
552
540
}
553
541
554
542
/**
@@ -577,9 +565,7 @@ abstract class RDD[T: ClassTag](
577
565
* elements (a, b) where a is in `this` and b is in `other`.
578
566
*/
579
567
@ RDDScope
580
- def cartesian [U : ClassTag ](other : RDD [U ]): RDD [(T , U )] = {
581
- new CartesianRDD (sc, this , other)
582
- }
568
+ def cartesian [U : ClassTag ](other : RDD [U ]): RDD [(T , U )] = new CartesianRDD (sc, this , other)
583
569
584
570
/**
585
571
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
@@ -591,9 +577,8 @@ abstract class RDD[T: ClassTag](
591
577
* or [[PairRDDFunctions.reduceByKey ]] will provide much better performance.
592
578
*/
593
579
@ RDDScope
594
- def groupBy [K ](f : T => K )(implicit kt : ClassTag [K ]): RDD [(K , Iterable [T ])] = {
580
+ def groupBy [K ](f : T => K )(implicit kt : ClassTag [K ]): RDD [(K , Iterable [T ])] =
595
581
groupBy[K ](f, defaultPartitioner(this ))
596
- }
597
582
598
583
/**
599
584
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
@@ -605,11 +590,8 @@ abstract class RDD[T: ClassTag](
605
590
* or [[PairRDDFunctions.reduceByKey ]] will provide much better performance.
606
591
*/
607
592
@ RDDScope
608
- def groupBy [K ](
609
- f : T => K ,
610
- numPartitions : Int )(implicit kt : ClassTag [K ]): RDD [(K , Iterable [T ])] = {
593
+ def groupBy [K ](f : T => K , numPartitions : Int )(implicit kt : ClassTag [K ]): RDD [(K , Iterable [T ])] =
611
594
groupBy(f, new HashPartitioner (numPartitions))
612
- }
613
595
614
596
/**
615
597
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
@@ -621,10 +603,8 @@ abstract class RDD[T: ClassTag](
621
603
* or [[PairRDDFunctions.reduceByKey ]] will provide much better performance.
622
604
*/
623
605
@ RDDScope
624
- def groupBy [K ](
625
- f : T => K ,
626
- p : Partitioner )
627
- (implicit kt : ClassTag [K ], ord : Ordering [K ] = null ): RDD [(K , Iterable [T ])] = {
606
+ def groupBy [K ](f : T => K , p : Partitioner )(implicit kt : ClassTag [K ], ord : Ordering [K ] = null )
607
+ : RDD [(K , Iterable [T ])] = {
628
608
val cleanF = sc.clean(f)
629
609
this .map(t => (cleanF(t), t)).groupByKey(p)
630
610
}
@@ -633,17 +613,14 @@ abstract class RDD[T: ClassTag](
633
613
* Return an RDD created by piping elements to a forked external process.
634
614
*/
635
615
@ RDDScope
636
- def pipe (command : String ): RDD [String ] = {
637
- new PipedRDD (this , command)
638
- }
616
+ def pipe (command : String ): RDD [String ] = new PipedRDD (this , command)
639
617
640
618
/**
641
619
* Return an RDD created by piping elements to a forked external process.
642
620
*/
643
621
@ RDDScope
644
- def pipe (command : String , env : Map [String , String ]): RDD [String ] = {
622
+ def pipe (command : String , env : Map [String , String ]): RDD [String ] =
645
623
new PipedRDD (this , command, env)
646
- }
647
624
648
625
/**
649
626
* Return an RDD created by piping elements to a forked external process.
@@ -685,8 +662,7 @@ abstract class RDD[T: ClassTag](
685
662
*/
686
663
@ RDDScope
687
664
def mapPartitions [U : ClassTag ](
688
- f : Iterator [T ] => Iterator [U ],
689
- preservesPartitioning : Boolean = false ): RDD [U ] = {
665
+ f : Iterator [T ] => Iterator [U ], preservesPartitioning : Boolean = false ): RDD [U ] = {
690
666
val func = (context : TaskContext , index : Int , iter : Iterator [T ]) => f(iter)
691
667
new MapPartitionsRDD (this , sc.clean(func), preservesPartitioning)
692
668
}
@@ -700,8 +676,7 @@ abstract class RDD[T: ClassTag](
700
676
*/
701
677
@ RDDScope
702
678
def mapPartitionsWithIndex [U : ClassTag ](
703
- f : (Int , Iterator [T ]) => Iterator [U ],
704
- preservesPartitioning : Boolean = false ): RDD [U ] = {
679
+ f : (Int , Iterator [T ]) => Iterator [U ], preservesPartitioning : Boolean = false ): RDD [U ] = {
705
680
val func = (context : TaskContext , index : Int , iter : Iterator [T ]) => f(index, iter)
706
681
new MapPartitionsRDD (this , sc.clean(func), preservesPartitioning)
707
682
}
@@ -731,8 +706,7 @@ abstract class RDD[T: ClassTag](
731
706
@ deprecated(" use mapPartitionsWithIndex" , " 0.7.0" )
732
707
@ RDDScope
733
708
def mapPartitionsWithSplit [U : ClassTag ](
734
- f : (Int , Iterator [T ]) => Iterator [U ],
735
- preservesPartitioning : Boolean = false ): RDD [U ] = {
709
+ f : (Int , Iterator [T ]) => Iterator [U ], preservesPartitioning : Boolean = false ): RDD [U ] = {
736
710
mapPartitionsWithIndex(f, preservesPartitioning)
737
711
}
738
712
@@ -826,44 +800,39 @@ abstract class RDD[T: ClassTag](
826
800
@ RDDScope
827
801
def zipPartitions [B : ClassTag , V : ClassTag ]
828
802
(rdd2 : RDD [B ], preservesPartitioning : Boolean )
829
- (f : (Iterator [T ], Iterator [B ]) => Iterator [V ]): RDD [V ] = {
803
+ (f : (Iterator [T ], Iterator [B ]) => Iterator [V ]): RDD [V ] =
830
804
new ZippedPartitionsRDD2 (sc, sc.clean(f), this , rdd2, preservesPartitioning)
831
- }
832
805
833
806
@ RDDScope
834
807
def zipPartitions [B : ClassTag , V : ClassTag ]
835
808
(rdd2 : RDD [B ])
836
- (f : (Iterator [T ], Iterator [B ]) => Iterator [V ]): RDD [V ] = {
809
+ (f : (Iterator [T ], Iterator [B ]) => Iterator [V ]): RDD [V ] =
837
810
zipPartitions(rdd2, preservesPartitioning = false )(f)
838
- }
839
811
840
812
@ RDDScope
841
813
def zipPartitions [B : ClassTag , C : ClassTag , V : ClassTag ]
842
814
(rdd2 : RDD [B ], rdd3 : RDD [C ], preservesPartitioning : Boolean )
843
- (f : (Iterator [T ], Iterator [B ], Iterator [C ]) => Iterator [V ]): RDD [V ] = {
815
+ (f : (Iterator [T ], Iterator [B ], Iterator [C ]) => Iterator [V ]): RDD [V ] =
844
816
new ZippedPartitionsRDD3 (sc, sc.clean(f), this , rdd2, rdd3, preservesPartitioning)
845
- }
846
817
847
818
@ RDDScope
848
819
def zipPartitions [B : ClassTag , C : ClassTag , V : ClassTag ]
849
820
(rdd2 : RDD [B ], rdd3 : RDD [C ])
850
- (f : (Iterator [T ], Iterator [B ], Iterator [C ]) => Iterator [V ]): RDD [V ] = {
821
+ (f : (Iterator [T ], Iterator [B ], Iterator [C ]) => Iterator [V ]): RDD [V ] =
851
822
zipPartitions(rdd2, rdd3, preservesPartitioning = false )(f)
852
- }
853
823
854
824
@ RDDScope
855
825
def zipPartitions [B : ClassTag , C : ClassTag , D : ClassTag , V : ClassTag ]
856
826
(rdd2 : RDD [B ], rdd3 : RDD [C ], rdd4 : RDD [D ], preservesPartitioning : Boolean )
857
- (f : (Iterator [T ], Iterator [B ], Iterator [C ], Iterator [D ]) => Iterator [V ]): RDD [V ] = {
827
+ (f : (Iterator [T ], Iterator [B ], Iterator [C ], Iterator [D ]) => Iterator [V ]): RDD [V ] =
858
828
new ZippedPartitionsRDD4 (sc, sc.clean(f), this , rdd2, rdd3, rdd4, preservesPartitioning)
859
- }
860
829
861
830
@ RDDScope
862
831
def zipPartitions [B : ClassTag , C : ClassTag , D : ClassTag , V : ClassTag ]
863
832
(rdd2 : RDD [B ], rdd3 : RDD [C ], rdd4 : RDD [D ])
864
- (f : (Iterator [T ], Iterator [B ], Iterator [C ], Iterator [D ]) => Iterator [V ]): RDD [V ] = {
833
+ (f : (Iterator [T ], Iterator [B ], Iterator [C ], Iterator [D ]) => Iterator [V ]): RDD [V ] =
865
834
zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false )(f)
866
- }
835
+
867
836
868
837
// Actions (launch a job to return a value to the user program)
869
838
@@ -929,26 +898,21 @@ abstract class RDD[T: ClassTag](
929
898
* RDD will be <= us.
930
899
*/
931
900
@ RDDScope
932
- def subtract (other : RDD [T ]): RDD [T ] = {
901
+ def subtract (other : RDD [T ]): RDD [T ] =
933
902
subtract(other, partitioner.getOrElse(new HashPartitioner (partitions.length)))
934
- }
935
903
936
904
/**
937
905
* Return an RDD with the elements from `this` that are not in `other`.
938
906
*/
939
907
@ RDDScope
940
- def subtract (other : RDD [T ], numPartitions : Int ): RDD [T ] = {
908
+ def subtract (other : RDD [T ], numPartitions : Int ): RDD [T ] =
941
909
subtract(other, new HashPartitioner (numPartitions))
942
- }
943
910
944
911
/**
945
912
* Return an RDD with the elements from `this` that are not in `other`.
946
913
*/
947
914
@ RDDScope
948
- def subtract (
949
- other : RDD [T ],
950
- p : Partitioner )
951
- (implicit ord : Ordering [T ] = null ): RDD [T ] = {
915
+ def subtract (other : RDD [T ], p : Partitioner )(implicit ord : Ordering [T ] = null ): RDD [T ] = {
952
916
if (partitioner == Some (p)) {
953
917
// Our partitioner knows how to handle T (which, since we have a partitioner, is
954
918
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
@@ -1108,9 +1072,7 @@ abstract class RDD[T: ClassTag](
1108
1072
*/
1109
1073
@ Experimental
1110
1074
@ RDDScope
1111
- def countApprox (
1112
- timeout : Long ,
1113
- confidence : Double = 0.95 ): PartialResult [BoundedDouble ] = {
1075
+ def countApprox (timeout : Long , confidence : Double = 0.95 ): PartialResult [BoundedDouble ] = {
1114
1076
val countElements : (TaskContext , Iterator [T ]) => Long = { (ctx, iter) =>
1115
1077
var result = 0L
1116
1078
while (iter.hasNext) {
@@ -1144,7 +1106,8 @@ abstract class RDD[T: ClassTag](
1144
1106
@ RDDScope
1145
1107
def countByValueApprox (timeout : Long , confidence : Double = 0.95 )
1146
1108
(implicit ord : Ordering [T ] = null )
1147
- : PartialResult [Map [T , BoundedDouble ]] = {
1109
+ : PartialResult [Map [T , BoundedDouble ]] =
1110
+ {
1148
1111
if (elementClassTag.runtimeClass.isArray) {
1149
1112
throw new SparkException (" countByValueApprox() does not support arrays" )
1150
1113
}
@@ -1224,9 +1187,7 @@ abstract class RDD[T: ClassTag](
1224
1187
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
1225
1188
*/
1226
1189
@ RDDScope
1227
- def zipWithIndex (): RDD [(T , Long )] = {
1228
- new ZippedWithIndexRDD (this )
1229
- }
1190
+ def zipWithIndex (): RDD [(T , Long )] = new ZippedWithIndexRDD (this )
1230
1191
1231
1192
/**
1232
1193
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
@@ -1297,11 +1258,9 @@ abstract class RDD[T: ClassTag](
1297
1258
* Return the first element in this RDD.
1298
1259
*/
1299
1260
@ RDDScope
1300
- def first (): T = {
1301
- take(1 ) match {
1302
- case Array (t) => t
1303
- case _ => throw new UnsupportedOperationException (" empty collection" )
1304
- }
1261
+ def first (): T = take(1 ) match {
1262
+ case Array (t) => t
1263
+ case _ => throw new UnsupportedOperationException (" empty collection" )
1305
1264
}
1306
1265
1307
1266
/**
@@ -1320,9 +1279,7 @@ abstract class RDD[T: ClassTag](
1320
1279
* @return an array of top elements
1321
1280
*/
1322
1281
@ RDDScope
1323
- def top (num : Int )(implicit ord : Ordering [T ]): Array [T ] = {
1324
- takeOrdered(num)(ord.reverse)
1325
- }
1282
+ def top (num : Int )(implicit ord : Ordering [T ]): Array [T ] = takeOrdered(num)(ord.reverse)
1326
1283
1327
1284
/**
1328
1285
* Returns the first k (smallest) elements from this RDD as defined by the specified
@@ -1367,18 +1324,14 @@ abstract class RDD[T: ClassTag](
1367
1324
* @return the maximum element of the RDD
1368
1325
* */
1369
1326
@ RDDScope
1370
- def max ()(implicit ord : Ordering [T ]): T = {
1371
- this .reduce(ord.max)
1372
- }
1327
+ def max ()(implicit ord : Ordering [T ]): T = this .reduce(ord.max)
1373
1328
1374
1329
/**
1375
1330
* Returns the min of this RDD as defined by the implicit Ordering[T].
1376
1331
* @return the minimum element of the RDD
1377
1332
* */
1378
1333
@ RDDScope
1379
- def min ()(implicit ord : Ordering [T ]): T = {
1380
- this .reduce(ord.min)
1381
- }
1334
+ def min ()(implicit ord : Ordering [T ]): T = this .reduce(ord.min)
1382
1335
1383
1336
/**
1384
1337
* @note due to complications in the internal implementation, this method will raise an
@@ -1389,9 +1342,7 @@ abstract class RDD[T: ClassTag](
1389
1342
* may be empty even when it has at least 1 partition.
1390
1343
*/
1391
1344
@ RDDScope
1392
- def isEmpty (): Boolean = {
1393
- partitions.length == 0 || take(1 ).length == 0
1394
- }
1345
+ def isEmpty (): Boolean = partitions.length == 0 || take(1 ).length == 0
1395
1346
1396
1347
/**
1397
1348
* Save this RDD as a text file, using string representations of elements.
@@ -1425,9 +1376,7 @@ abstract class RDD[T: ClassTag](
1425
1376
* Save this RDD as a compressed text file, using string representations of elements.
1426
1377
*/
1427
1378
@ RDDScope
1428
- def saveAsTextFile (
1429
- path : String ,
1430
- codec : Class [_ <: CompressionCodec ]): Unit = {
1379
+ def saveAsTextFile (path : String , codec : Class [_ <: CompressionCodec ]): Unit = {
1431
1380
// https://issues.apache.org/jira/browse/SPARK-2075
1432
1381
val nullWritableClassTag = implicitly[ClassTag [NullWritable ]]
1433
1382
val textClassTag = implicitly[ClassTag [Text ]]
0 commit comments