Skip to content

Commit 3ee3b2b

Browse files
committed
2 parents 2b0d513 + fd0b32c commit 3ee3b2b

File tree

98 files changed

+2365
-1601
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+2365
-1601
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
*~
2+
*.#*
3+
*#*#
24
*.swp
35
*.ipr
46
*.iml
57
*.iws
68
.idea/
9+
.idea_modules/
710
sbt/*.jar
811
.settings
912
.cache
@@ -16,6 +19,7 @@ third_party/libmesos.so
1619
third_party/libmesos.dylib
1720
conf/java-opts
1821
conf/*.sh
22+
conf/*.cmd
1923
conf/*.properties
2024
conf/*.conf
2125
conf/*.xml

.rat-excludes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ log4j.properties.template
2020
metrics.properties.template
2121
slaves
2222
spark-env.sh
23+
spark-env.cmd
2324
spark-env.sh.template
2425
log4j-defaults.properties
2526
bootstrap-tooltip.js
@@ -58,3 +59,4 @@ dist/*
5859
.*iws
5960
logs
6061
.*scalastyle-output.xml
62+
.*dependency-reduced-pom.xml

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ submitting any copyrighted material via pull request, email, or other means
88
you agree to license the material under the project's open source license and
99
warrant that you have the legal authority to do so.
1010

11-
Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
11+
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
1212
for more information.

bin/spark-sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
set -o posix
2525

2626
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
27-
CLASS_NOT_FOUND_EXIT_STATUS=1
27+
CLASS_NOT_FOUND_EXIT_STATUS=101
2828

2929
# Figure out where Spark is installed
3030
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
116116
}
117117
}
118118
} else {
119-
logWarning ("No need to commit output of task: " + taID.value)
119+
logInfo ("No need to commit output of task: " + taID.value)
120120
}
121121
}
122122

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
775775
}.toJavaRDD()
776776
}
777777

778+
private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
779+
private val pickle = new Pickler()
780+
private var batch = 1
781+
private val buffer = new mutable.ArrayBuffer[Any]
782+
783+
override def hasNext(): Boolean = iter.hasNext
784+
785+
override def next(): Array[Byte] = {
786+
while (iter.hasNext && buffer.length < batch) {
787+
buffer += iter.next()
788+
}
789+
val bytes = pickle.dumps(buffer.toArray)
790+
val size = bytes.length
791+
// let 1M < size < 10M
792+
if (size < 1024 * 1024) {
793+
batch *= 2
794+
} else if (size > 1024 * 1024 * 10 && batch > 1) {
795+
batch /= 2
796+
}
797+
buffer.clear()
798+
bytes
799+
}
800+
}
801+
778802
/**
779803
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
780804
* PySpark.
781805
*/
782806
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
783-
jRDD.rdd.mapPartitions { iter =>
784-
val pickle = new Pickler
785-
iter.map { row =>
786-
pickle.dumps(row)
787-
}
788-
}
807+
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
789808
}
790809

791810
/**

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
6868
construct(args ++ Array(""))
6969
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
7070
val typecode = args(0).asInstanceOf[String].charAt(0)
71-
val data: String = args(1).asInstanceOf[String]
72-
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
71+
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
72+
construct(typecode, machineCodes(typecode), data)
7373
} else {
7474
super.construct(args)
7575
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ object SparkSubmit {
5454
private val SPARK_SHELL = "spark-shell"
5555
private val PYSPARK_SHELL = "pyspark-shell"
5656

57-
private val CLASS_NOT_FOUND_EXIT_STATUS = 1
57+
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
5858

5959
// Exposed for testing
6060
private[spark] var exitFn: () => Unit = () => System.exit(-1)
@@ -172,7 +172,7 @@ object SparkSubmit {
172172
// All cluster managers
173173
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
174174
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
175-
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
175+
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
176176
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
177177
sysProp = "spark.driver.memory"),
178178
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@@ -183,6 +183,7 @@ object SparkSubmit {
183183
sysProp = "spark.driver.extraLibraryPath"),
184184

185185
// Standalone cluster only
186+
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
186187
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
187188
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
188189

@@ -261,7 +262,7 @@ object SparkSubmit {
261262
}
262263

263264
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
264-
if (clusterManager == YARN && deployMode == CLUSTER) {
265+
if (isYarnCluster) {
265266
childMainClass = "org.apache.spark.deploy.yarn.Client"
266267
if (args.primaryResource != SPARK_INTERNAL) {
267268
childArgs += ("--jar", args.primaryResource)
@@ -279,7 +280,7 @@ object SparkSubmit {
279280
}
280281

281282
// Read from default spark properties, if any
282-
for ((k, v) <- args.getDefaultSparkProperties) {
283+
for ((k, v) <- args.defaultSparkProperties) {
283284
sysProps.getOrElseUpdate(k, v)
284285
}
285286

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5757
var pyFiles: String = null
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

60-
parseOpts(args.toList)
61-
mergeSparkProperties()
62-
checkRequiredArguments()
63-
64-
/** Return default present in the currently defined defaults file. */
65-
def getDefaultSparkProperties = {
60+
/** Default properties present in the currently defined defaults file. */
61+
lazy val defaultSparkProperties: HashMap[String, String] = {
6662
val defaultProperties = new HashMap[String, String]()
6763
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
6864
Option(propertiesFile).foreach { filename =>
@@ -79,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7975
defaultProperties
8076
}
8177

78+
parseOpts(args.toList)
79+
mergeSparkProperties()
80+
checkRequiredArguments()
81+
8282
/**
8383
* Fill in any undefined values based on the default properties file or options passed in through
8484
* the '--conf' flag.
@@ -107,7 +107,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
107107
}
108108
}
109109

110-
val properties = getDefaultSparkProperties
110+
val properties = HashMap[String, String]()
111+
properties.putAll(defaultSparkProperties)
111112
properties.putAll(sparkProperties)
112113

113114
// Use properties file as fallback for values which have a direct analog to
@@ -213,7 +214,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
213214
| verbose $verbose
214215
|
215216
|Default properties from $propertiesFile:
216-
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
217+
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
217218
""".stripMargin
218219
}
219220

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
6767
}
6868

6969
private val appHeader = Seq(
70+
"App ID",
7071
"App Name",
7172
"Started",
7273
"Completed",
@@ -81,7 +82,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
8182
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
8283
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
8384
<tr>
84-
<td><a href={uiAddress}>{info.name}</a></td>
85+
<td><a href={uiAddress}>{info.id}</a></td>
86+
<td>{info.name}</td>
8587
<td>{startTime}</td>
8688
<td>{endTime}</td>
8789
<td>{duration}</td>

0 commit comments

Comments
 (0)