Skip to content

Commit d30fee2

Browse files
committed
Merge remote-tracking branch 'origin/master' into newCodeGen
Conflicts: project/SparkBuild.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
2 parents d2ad5c5 + 75db174 commit d30fee2

File tree

460 files changed

+5985
-1872
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

460 files changed

+5985
-1872
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ conf/spark-env.sh
1919
conf/streaming-env.sh
2020
conf/log4j.properties
2121
conf/spark-defaults.conf
22+
conf/hive-site.xml
2223
docs/_site
2324
docs/api
2425
target/

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# Apache Spark
22

3-
Lightning-Fast Cluster Computing - <http://spark.apache.org/>
3+
Spark is a fast and general cluster computing system for Big Data. It provides
4+
high-level APIs in Scala, Java, and Python, and an optimized engine that
5+
supports general computation graphs for data analysis. It also supports a
6+
rich set of higher-level tools including Spark SQL for SQL and structured
7+
data processing, MLLib for machine learning, GraphX for graph processing,
8+
and Spark Streaming.
9+
10+
<http://spark.apache.org/>
411

512

613
## Online Documentation
@@ -81,7 +88,7 @@ versions without YARN, use:
8188
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly
8289

8390
For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
84-
with YARN, also set `SPARK_YARN=true`:
91+
with YARN, also set `-Pyarn`:
8592

8693
# Apache Hadoop 2.0.5-alpha
8794
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
<groupId>org.xerial.snappy</groupId>
115115
<artifactId>snappy-java</artifactId>
116116
</dependency>
117+
<dependency>
118+
<groupId>net.jpountz.lz4</groupId>
119+
<artifactId>lz4</artifactId>
120+
</dependency>
117121
<dependency>
118122
<groupId>com.twitter</groupId>
119123
<artifactId>chill_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
5656
} else {
5757
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
5858
while (iter.hasNext) {
59-
val (k, v) = iter.next()
60-
combiners.insert(k, v)
59+
val pair = iter.next()
60+
combiners.insert(pair._1, pair._2)
6161
}
6262
// TODO: Make this non optional in a future release
6363
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
@@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
8585
} else {
8686
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
8787
while (iter.hasNext) {
88-
val (k, c) = iter.next()
89-
combiners.insert(k, c)
88+
val pair = iter.next()
89+
combiners.insert(pair._1, pair._2)
9090
}
9191
// TODO: Make this non optional in a future release
9292
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
134134
def getPartition(key: Any): Int = {
135135
val k = key.asInstanceOf[K]
136136
var partition = 0
137-
if (rangeBounds.length < 1000) {
138-
// If we have less than 100 partitions naive search
137+
if (rangeBounds.length <= 128) {
138+
// If we have less than 128 partitions naive search
139139
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
140140
partition += 1
141141
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
15311531
throw new SparkException("YARN mode not available ?", e)
15321532
}
15331533
}
1534-
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
1534+
val backend = try {
1535+
val clazz =
1536+
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
1537+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
1538+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
1539+
} catch {
1540+
case e: Exception => {
1541+
throw new SparkException("YARN mode not available ?", e)
1542+
}
1543+
}
15351544
scheduler.initialize(backend)
15361545
scheduler
15371546

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.executor.TaskMetrics
2222
import org.apache.spark.storage.BlockManagerId
23+
import org.apache.spark.util.Utils
2324

2425
/**
2526
* :: DeveloperApi ::
@@ -88,10 +89,7 @@ case class ExceptionFailure(
8889
stackTrace: Array[StackTraceElement],
8990
metrics: Option[TaskMetrics])
9091
extends TaskFailedReason {
91-
override def toErrorString: String = {
92-
val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
93-
s"$className ($description}\n$stackTraceString"
94-
}
92+
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
9593
}
9694

9795
/**

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private[spark] object TestUtils {
9292
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
9393
val compiler = ToolProvider.getSystemJavaCompiler
9494
val sourceFile = new JavaSourceFromString(className,
95-
"public class " + className + " { @Override public String toString() { " +
96-
"return \"" + value + "\";}}")
95+
"public class " + className + " implements java.io.Serializable {" +
96+
" @Override public String toString() { return \"" + value + "\"; }}")
9797

9898
// Calling this outputs a class file in pwd. It's easier to just rename the file than
9999
// build a custom FileManager that controls the output location.

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
106106
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
107107
* define their own way to get the value.
108108
*/
109-
private[spark] def getValue(): T
109+
protected def getValue(): T
110110

111111
/**
112112
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
113113
* Broadcast class must define their own logic to unpersist their own data.
114114
*/
115-
private[spark] def doUnpersist(blocking: Boolean)
115+
protected def doUnpersist(blocking: Boolean)
116116

117117
/**
118118
* Actually destroy all data and metadata related to this broadcast variable.
119119
* Implementation of Broadcast class must define their own logic to destroy their own
120120
* state.
121121
*/
122-
private[spark] def doDestroy(blocking: Boolean)
122+
protected def doDestroy(blocking: Boolean)
123123

124124
/** Check if this broadcast is valid. If not valid, exception is thrown. */
125-
private[spark] def assertValid() {
125+
protected def assertValid() {
126126
if (!_isValid) {
127127
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
128128
}

core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] class BroadcastManager(
3939
synchronized {
4040
if (!initialized) {
4141
val broadcastFactoryClass =
42-
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
42+
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
4343

4444
broadcastFactory =
4545
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

0 commit comments

Comments
 (0)