Skip to content

Commit d5fcf67

Browse files
committed
Merge remote-tracking branch 'upstream/master' into fix_hive_ctas
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
2 parents 4389607 + 64262ed commit d5fcf67

File tree

263 files changed

+2692
-1811
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

263 files changed

+2692
-1811
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@
275275
<dependency>
276276
<groupId>org.tachyonproject</groupId>
277277
<artifactId>tachyon-client</artifactId>
278-
<version>0.6.1</version>
278+
<version>0.5.0</version>
279279
<exclusions>
280280
<exclusion>
281281
<groupId>org.apache.hadoop</groupId>

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.spark
1919

2020
import java.io.{ObjectInputStream, Serializable}
21-
import java.util.concurrent.atomic.AtomicLong
22-
import java.lang.ThreadLocal
2321

2422
import scala.collection.generic.Growable
2523
import scala.collection.mutable.Map
@@ -109,7 +107,7 @@ class Accumulable[R, T] (
109107
* The typical use of this method is to directly mutate the local value, eg., to add
110108
* an element to a Set.
111109
*/
112-
def localValue = value_
110+
def localValue: R = value_
113111

114112
/**
115113
* Set the accumulator's value; only allowed on master.
@@ -137,7 +135,7 @@ class Accumulable[R, T] (
137135
Accumulators.register(this, false)
138136
}
139137

140-
override def toString = if (value_ == null) "null" else value_.toString
138+
override def toString: String = if (value_ == null) "null" else value_.toString
141139
}
142140

143141
/**
@@ -257,22 +255,22 @@ object AccumulatorParam {
257255

258256
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
259257
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
260-
def zero(initialValue: Double) = 0.0
258+
def zero(initialValue: Double): Double = 0.0
261259
}
262260

263261
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
264262
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
265-
def zero(initialValue: Int) = 0
263+
def zero(initialValue: Int): Int = 0
266264
}
267265

268266
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
269-
def addInPlace(t1: Long, t2: Long) = t1 + t2
270-
def zero(initialValue: Long) = 0L
267+
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
268+
def zero(initialValue: Long): Long = 0L
271269
}
272270

273271
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
274-
def addInPlace(t1: Float, t2: Float) = t1 + t2
275-
def zero(initialValue: Float) = 0f
272+
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
273+
def zero(initialValue: Float): Float = 0f
276274
}
277275

278276
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
@@ -351,6 +349,7 @@ private[spark] object Accumulators extends Logging {
351349
}
352350
}
353351

354-
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
355-
def stringifyValue(value: Any) = "%s".format(value)
352+
def stringifyPartialValue(partialValue: Any): String = "%s".format(partialValue)
353+
354+
def stringifyValue(value: Any): String = "%s".format(value)
356355
}

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class ShuffleDependency[K, V, C](
7474
val mapSideCombine: Boolean = false)
7575
extends Dependency[Product2[K, V]] {
7676

77-
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
77+
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
7878

7979
val shuffleId: Int = _rdd.context.newShuffleId()
8080

@@ -91,7 +91,7 @@ class ShuffleDependency[K, V, C](
9191
*/
9292
@DeveloperApi
9393
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
94-
override def getParents(partitionId: Int) = List(partitionId)
94+
override def getParents(partitionId: Int): List[Int] = List(partitionId)
9595
}
9696

9797

@@ -107,7 +107,7 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
107107
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
108108
extends NarrowDependency[T](rdd) {
109109

110-
override def getParents(partitionId: Int) = {
110+
override def getParents(partitionId: Int): List[Int] = {
111111
if (partitionId >= outStart && partitionId < outStart + length) {
112112
List(partitionId - outStart + inStart)
113113
} else {

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
168168
}
169169
}
170170

171-
def jobIds = Seq(jobWaiter.jobId)
171+
def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
172172
}
173173

174174

@@ -276,7 +276,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
276276

277277
override def value: Option[Try[T]] = p.future.value
278278

279-
def jobIds = jobs
279+
def jobIds: Seq[Int] = jobs
280280

281281
}
282282

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
6565
super.preStart()
6666
}
6767

68-
override def receiveWithLogging = {
68+
override def receiveWithLogging: PartialFunction[Any, Unit] = {
6969
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
7070
val unknownExecutor = !scheduler.executorHeartbeatReceived(
7171
executorId, taskMetrics, blockManagerId)

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
4343
extends Actor with ActorLogReceive with Logging {
4444
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
4545

46-
override def receiveWithLogging = {
46+
override def receiveWithLogging: PartialFunction[Any, Unit] = {
4747
case GetMapOutputStatuses(shuffleId: Int) =>
4848
val hostPort = sender.path.address.hostPort
4949
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object Partitioner {
7676
* produce an unexpected or incorrect result.
7777
*/
7878
class HashPartitioner(partitions: Int) extends Partitioner {
79-
def numPartitions = partitions
79+
def numPartitions: Int = partitions
8080

8181
def getPartition(key: Any): Int = key match {
8282
case null => 0
@@ -154,7 +154,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
154154
}
155155
}
156156

157-
def numPartitions = rangeBounds.length + 1
157+
def numPartitions: Int = rangeBounds.length + 1
158158

159159
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
160160

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import org.apache.spark.util.Utils
2828

2929
@DeveloperApi
3030
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
31-
def value = t
32-
override def toString = t.toString
31+
32+
def value: T = t
33+
34+
override def toString: String = t.toString
3335

3436
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
3537
out.defaultWriteObject()

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
133133
}
134134

135135
/** Set multiple parameters together */
136-
def setAll(settings: Traversable[(String, String)]) = {
136+
def setAll(settings: Traversable[(String, String)]): SparkConf = {
137137
this.settings.putAll(settings.toMap.asJava)
138138
this
139139
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -986,15 +986,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
986986
union(Seq(first) ++ rest)
987987

988988
/** Get an RDD that has no partitions or elements. */
989-
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)
989+
def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
990990

991991
// Methods for creating shared variables
992992

993993
/**
994994
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
995995
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
996996
*/
997-
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
997+
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] =
998998
{
999999
val acc = new Accumulator(initialValue, param)
10001000
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
@@ -1006,7 +1006,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10061006
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
10071007
* driver can access the accumulator's `value`.
10081008
*/
1009-
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
1009+
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
1010+
: Accumulator[T] = {
10101011
val acc = new Accumulator(initialValue, param, Some(name))
10111012
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
10121013
acc
@@ -1018,7 +1019,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10181019
* @tparam R accumulator result type
10191020
* @tparam T type that can be added to the accumulator
10201021
*/
1021-
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = {
1022+
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
1023+
: Accumulable[R, T] = {
10221024
val acc = new Accumulable(initialValue, param)
10231025
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
10241026
acc
@@ -1031,7 +1033,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10311033
* @tparam R accumulator result type
10321034
* @tparam T type that can be added to the accumulator
10331035
*/
1034-
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = {
1036+
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
1037+
: Accumulable[R, T] = {
10351038
val acc = new Accumulable(initialValue, param, Some(name))
10361039
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
10371040
acc
@@ -1209,7 +1212,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12091212
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
12101213

12111214
/** The version of Spark on which this application is running. */
1212-
def version = SPARK_VERSION
1215+
def version: String = SPARK_VERSION
12131216

12141217
/**
12151218
* Return a map from the slave to the max memory available for caching and the remaining
@@ -1659,7 +1662,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16591662
}
16601663
}
16611664

1662-
def getCheckpointDir = checkpointDir
1665+
def getCheckpointDir: Option[String] = checkpointDir
16631666

16641667
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
16651668
def defaultParallelism: Int = {
@@ -1900,28 +1903,28 @@ object SparkContext extends Logging {
19001903
"backward compatibility.", "1.3.0")
19011904
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
19021905
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
1903-
def zero(initialValue: Double) = 0.0
1906+
def zero(initialValue: Double): Double = 0.0
19041907
}
19051908

19061909
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
19071910
"backward compatibility.", "1.3.0")
19081911
object IntAccumulatorParam extends AccumulatorParam[Int] {
19091912
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
1910-
def zero(initialValue: Int) = 0
1913+
def zero(initialValue: Int): Int = 0
19111914
}
19121915

19131916
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
19141917
"backward compatibility.", "1.3.0")
19151918
object LongAccumulatorParam extends AccumulatorParam[Long] {
1916-
def addInPlace(t1: Long, t2: Long) = t1 + t2
1917-
def zero(initialValue: Long) = 0L
1919+
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
1920+
def zero(initialValue: Long): Long = 0L
19181921
}
19191922

19201923
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
19211924
"backward compatibility.", "1.3.0")
19221925
object FloatAccumulatorParam extends AccumulatorParam[Float] {
1923-
def addInPlace(t1: Float, t2: Float) = t1 + t2
1924-
def zero(initialValue: Float) = 0f
1926+
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
1927+
def zero(initialValue: Float): Float = 0f
19251928
}
19261929

19271930
// The following deprecated functions have already been moved to `object RDD` to
@@ -1931,18 +1934,18 @@ object SparkContext extends Logging {
19311934
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19321935
"kept here only for backward compatibility.", "1.3.0")
19331936
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1934-
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
1937+
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] =
19351938
RDD.rddToPairRDDFunctions(rdd)
1936-
}
19371939

19381940
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19391941
"kept here only for backward compatibility.", "1.3.0")
1940-
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)
1942+
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] =
1943+
RDD.rddToAsyncRDDActions(rdd)
19411944

19421945
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19431946
"kept here only for backward compatibility.", "1.3.0")
19441947
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1945-
rdd: RDD[(K, V)]) = {
1948+
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
19461949
val kf = implicitly[K => Writable]
19471950
val vf = implicitly[V => Writable]
19481951
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
@@ -1954,16 +1957,17 @@ object SparkContext extends Logging {
19541957
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19551958
"kept here only for backward compatibility.", "1.3.0")
19561959
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
1957-
rdd: RDD[(K, V)]) =
1960+
rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] =
19581961
RDD.rddToOrderedRDDFunctions(rdd)
19591962

19601963
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19611964
"kept here only for backward compatibility.", "1.3.0")
1962-
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)
1965+
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions =
1966+
RDD.doubleRDDToDoubleRDDFunctions(rdd)
19631967

19641968
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
19651969
"kept here only for backward compatibility.", "1.3.0")
1966-
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1970+
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions =
19671971
RDD.numericRDDToDoubleRDDFunctions(rdd)
19681972

19691973
// The following deprecated functions have already been moved to `object WritableFactory` to
@@ -2134,7 +2138,7 @@ object SparkContext extends Logging {
21342138
(backend, scheduler)
21352139

21362140
case LOCAL_N_REGEX(threads) =>
2137-
def localCpuCount = Runtime.getRuntime.availableProcessors()
2141+
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
21382142
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
21392143
val threadCount = if (threads == "*") localCpuCount else threads.toInt
21402144
if (threadCount <= 0) {
@@ -2146,7 +2150,7 @@ object SparkContext extends Logging {
21462150
(backend, scheduler)
21472151

21482152
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
2149-
def localCpuCount = Runtime.getRuntime.availableProcessors()
2153+
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
21502154
// local[*, M] means the number of cores on the computer with M failures
21512155
// local[N, M] means exactly N threads with M failures
21522156
val threadCount = if (threads == "*") localCpuCount else threads.toInt

0 commit comments

Comments
 (0)