Skip to content

Commit 52013d5

Browse files
committed
Merge remote-tracking branch 'upstream/master' into gbt-api
2 parents e9b8410 + f90ad5d commit 52013d5

File tree

80 files changed

+2021
-840
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2021
-840
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
2121

2222
import java.io._
2323
import java.net.URI
24-
import java.util.Arrays
24+
import java.util.{Arrays, Properties, UUID}
2525
import java.util.concurrent.atomic.AtomicInteger
26-
import java.util.{Properties, UUID}
2726
import java.util.UUID.randomUUID
2827
import scala.collection.{Map, Set}
2928
import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
4140
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4241
import org.apache.spark.broadcast.Broadcast
4342
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43+
import org.apache.spark.executor.TriggerThreadDump
4444
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
4545
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4646
import org.apache.spark.rdd._
@@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
5151
import org.apache.spark.storage._
5252
import org.apache.spark.ui.SparkUI
5353
import org.apache.spark.ui.jobs.JobProgressListener
54-
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
54+
import org.apache.spark.util._
5555

5656
/**
5757
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
361361
override protected def childValue(parent: Properties): Properties = new Properties(parent)
362362
}
363363

364+
/**
365+
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
366+
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
367+
* to an executor being dead or unresponsive or due to network issues while sending the thread
368+
* dump message back to the driver.
369+
*/
370+
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
371+
try {
372+
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
373+
Some(Utils.getThreadDump())
374+
} else {
375+
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
376+
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
377+
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
378+
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
379+
}
380+
} catch {
381+
case e: Exception =>
382+
logError(s"Exception getting thread dump from executor $executorId", e)
383+
None
384+
}
385+
}
386+
364387
private[spark] def getLocalProperties: Properties = localProperties.get()
365388

366389
private[spark] def setLocalProperties(props: Properties) {

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ case class FetchFailed(
6969
bmAddress: BlockManagerId, // Note that bmAddress can be null
7070
shuffleId: Int,
7171
mapId: Int,
72-
reduceId: Int)
72+
reduceId: Int,
73+
message: String)
7374
extends TaskFailedReason {
7475
override def toErrorString: String = {
7576
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
76-
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
77+
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
78+
s"message=\n$message\n)"
7779
}
7880
}
7981

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ private[python] object Converter extends Logging {
6161
* Other objects are passed through without conversion.
6262
*/
6363
private[python] class WritableToJavaConverter(
64-
conf: Broadcast[SerializableWritable[Configuration]],
65-
batchSize: Int) extends Converter[Any, Any] {
64+
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
6665

6766
/**
6867
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter(
9493
map.put(convertWritable(k), convertWritable(v))
9594
}
9695
map
97-
case w: Writable =>
98-
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
96+
case w: Writable => WritableUtils.clone(w, conf.value.value)
9997
case other => other
10098
}
10199
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 5 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@ import java.net._
2222
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
2323

2424
import scala.collection.JavaConversions._
25-
import scala.collection.JavaConverters._
2625
import scala.collection.mutable
2726
import scala.language.existentials
2827

2928
import com.google.common.base.Charsets.UTF_8
30-
import net.razorvine.pickle.{Pickler, Unpickler}
3129

3230
import org.apache.hadoop.conf.Configuration
3331
import org.apache.hadoop.io.compress.CompressionCodec
@@ -442,7 +440,7 @@ private[spark] object PythonRDD extends Logging {
442440
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
443441
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
444442
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
445-
new WritableToJavaConverter(confBroadcasted, batchSize))
443+
new WritableToJavaConverter(confBroadcasted))
446444
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
447445
}
448446

@@ -468,7 +466,7 @@ private[spark] object PythonRDD extends Logging {
468466
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
469467
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
470468
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
471-
new WritableToJavaConverter(confBroadcasted, batchSize))
469+
new WritableToJavaConverter(confBroadcasted))
472470
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
473471
}
474472

@@ -494,7 +492,7 @@ private[spark] object PythonRDD extends Logging {
494492
None, inputFormatClass, keyClass, valueClass, conf)
495493
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
496494
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
497-
new WritableToJavaConverter(confBroadcasted, batchSize))
495+
new WritableToJavaConverter(confBroadcasted))
498496
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
499497
}
500498

@@ -537,7 +535,7 @@ private[spark] object PythonRDD extends Logging {
537535
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
538536
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
539537
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
540-
new WritableToJavaConverter(confBroadcasted, batchSize))
538+
new WritableToJavaConverter(confBroadcasted))
541539
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
542540
}
543541

@@ -563,7 +561,7 @@ private[spark] object PythonRDD extends Logging {
563561
None, inputFormatClass, keyClass, valueClass, conf)
564562
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
565563
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
566-
new WritableToJavaConverter(confBroadcasted, batchSize))
564+
new WritableToJavaConverter(confBroadcasted))
567565
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
568566
}
569567

@@ -746,104 +744,6 @@ private[spark] object PythonRDD extends Logging {
746744
converted.saveAsHadoopDataset(new JobConf(conf))
747745
}
748746
}
749-
750-
751-
/**
752-
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
753-
*/
754-
@deprecated("PySpark does not use it anymore", "1.1")
755-
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
756-
pyRDD.rdd.mapPartitions { iter =>
757-
val unpickle = new Unpickler
758-
SerDeUtil.initialize()
759-
iter.flatMap { row =>
760-
unpickle.loads(row) match {
761-
// in case of objects are pickled in batch mode
762-
case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
763-
// not in batch mode
764-
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
765-
}
766-
}
767-
}
768-
}
769-
770-
/**
771-
* Convert an RDD of serialized Python tuple to Array (no recursive conversions).
772-
* It is only used by pyspark.sql.
773-
*/
774-
def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
775-
776-
def toArray(obj: Any): Array[_] = {
777-
obj match {
778-
case objs: JArrayList[_] =>
779-
objs.toArray
780-
case obj if obj.getClass.isArray =>
781-
obj.asInstanceOf[Array[_]].toArray
782-
}
783-
}
784-
785-
pyRDD.rdd.mapPartitions { iter =>
786-
val unpickle = new Unpickler
787-
iter.flatMap { row =>
788-
val obj = unpickle.loads(row)
789-
if (batched) {
790-
obj.asInstanceOf[JArrayList[_]].map(toArray)
791-
} else {
792-
Seq(toArray(obj))
793-
}
794-
}
795-
}.toJavaRDD()
796-
}
797-
798-
private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
799-
private val pickle = new Pickler()
800-
private var batch = 1
801-
private val buffer = new mutable.ArrayBuffer[Any]
802-
803-
override def hasNext(): Boolean = iter.hasNext
804-
805-
override def next(): Array[Byte] = {
806-
while (iter.hasNext && buffer.length < batch) {
807-
buffer += iter.next()
808-
}
809-
val bytes = pickle.dumps(buffer.toArray)
810-
val size = bytes.length
811-
// let 1M < size < 10M
812-
if (size < 1024 * 1024) {
813-
batch *= 2
814-
} else if (size > 1024 * 1024 * 10 && batch > 1) {
815-
batch /= 2
816-
}
817-
buffer.clear()
818-
bytes
819-
}
820-
}
821-
822-
/**
823-
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
824-
* PySpark.
825-
*/
826-
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
827-
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
828-
}
829-
830-
/**
831-
* Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
832-
*/
833-
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
834-
pyRDD.rdd.mapPartitions { iter =>
835-
SerDeUtil.initialize()
836-
val unpickle = new Unpickler
837-
iter.flatMap { row =>
838-
val obj = unpickle.loads(row)
839-
if (batched) {
840-
obj.asInstanceOf[JArrayList[_]].asScala
841-
} else {
842-
Seq(obj)
843-
}
844-
}
845-
}.toJavaRDD()
846-
}
847747
}
848748

849749
private

0 commit comments

Comments
 (0)