Skip to content

Commit fb9565b

Browse files
committed
Merge branch 'master' of github.com:apache/spark into profiler
Conflicts: python/pyspark/worker.py
2 parents 116d52a + 729952a commit fb9565b

File tree

144 files changed

+3923
-2383
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+3923
-2383
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
*~
2+
*.#*
3+
*#*#
24
*.swp
35
*.ipr
46
*.iml
57
*.iws
68
.idea/
9+
.idea_modules/
710
sbt/*.jar
811
.settings
912
.cache
@@ -16,6 +19,7 @@ third_party/libmesos.so
1619
third_party/libmesos.dylib
1720
conf/java-opts
1821
conf/*.sh
22+
conf/*.cmd
1923
conf/*.properties
2024
conf/*.conf
2125
conf/*.xml

.rat-excludes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ log4j.properties.template
2020
metrics.properties.template
2121
slaves
2222
spark-env.sh
23+
spark-env.cmd
2324
spark-env.sh.template
2425
log4j-defaults.properties
2526
bootstrap-tooltip.js
@@ -58,3 +59,4 @@ dist/*
5859
.*iws
5960
logs
6061
.*scalastyle-output.xml
62+
.*dependency-reduced-pom.xml

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ submitting any copyrighted material via pull request, email, or other means
88
you agree to license the material under the project's open source license and
99
warrant that you have the legal authority to do so.
1010

11-
Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
11+
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
1212
for more information.

assembly/pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@
141141
<include>com.google.common.**</include>
142142
</includes>
143143
<excludes>
144-
<exclude>com.google.common.base.Optional**</exclude>
144+
<exclude>com/google/common/base/Absent*</exclude>
145+
<exclude>com/google/common/base/Optional*</exclude>
146+
<exclude>com/google/common/base/Present*</exclude>
145147
</excludes>
146148
</relocation>
147149
</relocations>

bin/spark-sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
set -o posix
2525

2626
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
27-
CLASS_NOT_FOUND_EXIT_STATUS=1
27+
CLASS_NOT_FOUND_EXIT_STATUS=101
2828

2929
# Figure out where Spark is installed
3030
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

core/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,9 @@
343343
<filter>
344344
<artifact>com.google.guava:guava</artifact>
345345
<includes>
346+
<include>com/google/common/base/Absent*</include>
346347
<include>com/google/common/base/Optional*</include>
348+
<include>com/google/common/base/Present*</include>
347349
</includes>
348350
</filter>
349351
</filters>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
10301030
}
10311031

10321032
/**
1033-
* Support function for API backtraces.
1033+
* Set the thread-local property for overriding the call sites
1034+
* of actions and RDDs.
10341035
*/
1035-
def setCallSite(site: String) {
1036-
setLocalProperty("externalCallSite", site)
1036+
def setCallSite(shortCallSite: String) {
1037+
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
10371038
}
10381039

10391040
/**
1040-
* Support function for API backtraces.
1041+
* Set the thread-local property for overriding the call sites
1042+
* of actions and RDDs.
1043+
*/
1044+
private[spark] def setCallSite(callSite: CallSite) {
1045+
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
1046+
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
1047+
}
1048+
1049+
/**
1050+
* Clear the thread-local property for overriding the call sites
1051+
* of actions and RDDs.
10411052
*/
10421053
def clearCallSite() {
1043-
setLocalProperty("externalCallSite", null)
1054+
setLocalProperty(CallSite.SHORT_FORM, null)
1055+
setLocalProperty(CallSite.LONG_FORM, null)
10441056
}
10451057

10461058
/**
10471059
* Capture the current user callsite and return a formatted version for printing. If the user
1048-
* has overridden the call site, this will return the user's version.
1060+
* has overridden the call site using `setCallSite()`, this will return the user's version.
10491061
*/
10501062
private[spark] def getCallSite(): CallSite = {
1051-
Option(getLocalProperty("externalCallSite")) match {
1052-
case Some(callSite) => CallSite(callSite, longForm = "")
1053-
case None => Utils.getCallSite
1054-
}
1063+
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
1064+
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
1065+
CallSite(shortCallSite, longCallSite)
1066+
}.getOrElse(Utils.getCallSite())
10551067
}
10561068

10571069
/**

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
116116
}
117117
}
118118
} else {
119-
logWarning ("No need to commit output of task: " + taID.value)
119+
logInfo ("No need to commit output of task: " + taID.value)
120120
}
121121
}
122122

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
775775
}.toJavaRDD()
776776
}
777777

778+
private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
779+
private val pickle = new Pickler()
780+
private var batch = 1
781+
private val buffer = new mutable.ArrayBuffer[Any]
782+
783+
override def hasNext(): Boolean = iter.hasNext
784+
785+
override def next(): Array[Byte] = {
786+
while (iter.hasNext && buffer.length < batch) {
787+
buffer += iter.next()
788+
}
789+
val bytes = pickle.dumps(buffer.toArray)
790+
val size = bytes.length
791+
// let 1M < size < 10M
792+
if (size < 1024 * 1024) {
793+
batch *= 2
794+
} else if (size > 1024 * 1024 * 10 && batch > 1) {
795+
batch /= 2
796+
}
797+
buffer.clear()
798+
bytes
799+
}
800+
}
801+
778802
/**
779803
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
780804
* PySpark.
781805
*/
782806
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
783-
jRDD.rdd.mapPartitions { iter =>
784-
val pickle = new Pickler
785-
iter.map { row =>
786-
pickle.dumps(row)
787-
}
788-
}
807+
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
789808
}
790809

791810
/**

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
6868
construct(args ++ Array(""))
6969
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
7070
val typecode = args(0).asInstanceOf[String].charAt(0)
71-
val data: String = args(1).asInstanceOf[String]
72-
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
71+
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
72+
construct(typecode, machineCodes(typecode), data)
7373
} else {
7474
super.construct(args)
7575
}

0 commit comments

Comments
 (0)