Skip to content

Commit 0ec4ac7

Browse files
committed
Java API's
1 parent e95bf69 commit 0ec4ac7

File tree

3 files changed

+70
-1
lines changed

3 files changed

+70
-1
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
230230
*/
231231
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String],
232232
display: Boolean) extends Accumulable[T,T](initialValue, param, name, display) {
233-
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, true)
233+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false)
234234
}
235235

236236
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,16 @@ class SparkContext(config: SparkConf) extends Logging {
775775
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
776776
new Accumulable(initialValue, param)
777777

778+
/**
779+
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
780+
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
781+
* access the accumuable's `value`.
782+
* @tparam T accumulator type
783+
* @tparam R type that can be added to the accumulator
784+
*/
785+
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
786+
new Accumulable(initialValue, param, Some(name), true)
787+
778788
/**
779789
* Create an accumulator from a "mutable collection" type.
780790
*

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,40 +414,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
414414
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
415415
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
416416

417+
/**
418+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
419+
* to using the `add` method. Only the master can access the accumulator's `value`.
420+
*
421+
* This version supports naming the accumulator for display in Spark's web UI.
422+
*/
423+
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
424+
sc.accumulator(initialValue, name)(IntAccumulatorParam)
425+
.asInstanceOf[Accumulator[java.lang.Integer]]
426+
417427
/**
418428
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
419429
* to using the `add` method. Only the master can access the accumulator's `value`.
420430
*/
421431
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
422432
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
423433

434+
/**
435+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
436+
* to using the `add` method. Only the master can access the accumulator's `value`.
437+
*
438+
* This version supports naming the accumulator for display in Spark's web UI.
439+
*/
440+
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
441+
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
442+
.asInstanceOf[Accumulator[java.lang.Double]]
443+
424444
/**
425445
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
426446
* to using the `add` method. Only the master can access the accumulator's `value`.
427447
*/
428448
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
429449

450+
/**
451+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
452+
* to using the `add` method. Only the master can access the accumulator's `value`.
453+
*
454+
* This version supports naming the accumulator for display in Spark's web UI.
455+
*/
456+
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
457+
intAccumulator(initialValue, name)
458+
430459
/**
431460
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
432461
* to using the `add` method. Only the master can access the accumulator's `value`.
433462
*/
434463
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
435464
doubleAccumulator(initialValue)
436465

466+
467+
/**
468+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
469+
* to using the `add` method. Only the master can access the accumulator's `value`.
470+
*
471+
* This version supports naming the accumulator for display in Spark's web UI.
472+
*/
473+
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
474+
doubleAccumulator(initialValue, name)
475+
437476
/**
438477
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
439478
* values to using the `add` method. Only the master can access the accumulator's `value`.
440479
*/
441480
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
442481
sc.accumulator(initialValue)(accumulatorParam)
443482

483+
/**
484+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
485+
* values to using the `add` method. Only the master can access the accumulator's `value`.
486+
*
487+
* This version supports naming the accumulator for display in Spark's web UI.
488+
*/
489+
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
490+
: Accumulator[T] =
491+
sc.accumulator(initialValue, name)(accumulatorParam)
492+
444493
/**
445494
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
446495
* can "add" values with `add`. Only the master can access the accumuable's `value`.
447496
*/
448497
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
449498
sc.accumulable(initialValue)(param)
450499

500+
/**
501+
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
502+
* can "add" values with `add`. Only the master can access the accumuable's `value`.
503+
*
504+
* This version supports naming the accumulator for display in Spark's web UI.
505+
*/
506+
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
507+
: Accumulable[T, R] =
508+
sc.accumulable(initialValue, name)(param)
509+
451510
/**
452511
* Broadcast a read-only variable to the cluster, returning a
453512
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.

0 commit comments

Comments
 (0)