Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 9cbf01b

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into readwrite
2 parents f0c5a04 + 3a60038 commit 9cbf01b

File tree

21 files changed

+758
-407
lines changed

21 files changed

+758
-407
lines changed

core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
678678
*
679679
* Note: Return statements are NOT allowed in the given body.
680680
*/
681-
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
681+
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
682682

683683
// Methods for creating RDDs
684684

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
2424
import com.fasterxml.jackson.databind.ObjectMapper
2525
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2626

27-
import org.apache.spark.SparkContext
27+
import org.apache.spark.{Logging, SparkContext}
2828

2929
/**
3030
* A general, named code block representing an operation that instantiates RDDs.
@@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
4343
@JsonPropertyOrder(Array("id", "name", "parent"))
4444
private[spark] class RDDOperationScope(
4545
val name: String,
46-
val parent: Option[RDDOperationScope] = None) {
47-
48-
val id: Int = RDDOperationScope.nextScopeId()
46+
val parent: Option[RDDOperationScope] = None,
47+
val id: String = RDDOperationScope.nextScopeId().toString) {
4948

5049
def toJson: String = {
5150
RDDOperationScope.jsonMapper.writeValueAsString(this)
@@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
7574
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
7675
* An RDD scope tracks the series of operations that created a given RDD.
7776
*/
78-
private[spark] object RDDOperationScope {
77+
private[spark] object RDDOperationScope extends Logging {
7978
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
8079
private val scopeCounter = new AtomicInteger(0)
8180

@@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {
8887

8988
/**
9089
* Execute the given body such that all RDDs created in this body will have the same scope.
91-
* The name of the scope will be the name of the method that immediately encloses this one.
90+
* The name of the scope will be the first method name in the stack trace that is not the
91+
* same as this method's.
9292
*
9393
* Note: Return statements are NOT allowed in body.
9494
*/
9595
private[spark] def withScope[T](
9696
sc: SparkContext,
9797
allowNesting: Boolean = false)(body: => T): T = {
98-
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
98+
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
99+
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
100+
// Climb upwards to find the first method that's called something different
101+
val callerMethodName = stackTrace
102+
.find(_.getMethodName != ourMethodName)
103+
.map(_.getMethodName)
104+
.getOrElse {
105+
// Log a warning just in case, but this should almost certainly never happen
106+
logWarning("No valid method name for this RDD operation scope!")
107+
"N/A"
108+
}
99109
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
100110
}
101111

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.spark.scheduler.local
1919

2020
import java.nio.ByteBuffer
21-
import java.util.concurrent.TimeUnit
2221

2322
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
2423
import org.apache.spark.TaskState.TaskState
2524
import org.apache.spark.executor.{Executor, ExecutorBackend}
26-
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
25+
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
2726
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
28-
import org.apache.spark.util.{ThreadUtils, Utils}
2927

3028
private case class ReviveOffers()
3129

@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
4745
private val totalCores: Int)
4846
extends ThreadSafeRpcEndpoint with Logging {
4947

50-
private val reviveThread =
51-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
52-
5348
private var freeCores = totalCores
5449

5550
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
7974
context.reply(true)
8075
}
8176

82-
8377
def reviveOffers() {
8478
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
85-
val tasks = scheduler.resourceOffers(offers).flatten
86-
for (task <- tasks) {
79+
for (task <- scheduler.resourceOffers(offers).flatten) {
8780
freeCores -= scheduler.CPUS_PER_TASK
8881
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
8982
task.name, task.serializedTask)
9083
}
91-
if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
92-
// Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
93-
reviveThread.schedule(new Runnable {
94-
override def run(): Unit = Utils.tryLogNonFatalError {
95-
Option(self).foreach(_.send(ReviveOffers))
96-
}
97-
}, 1000, TimeUnit.MILLISECONDS)
98-
}
99-
}
100-
101-
override def onStop(): Unit = {
102-
reviveThread.shutdownNow()
10384
}
10485
}
10586

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ private[ui] object RDDOperationGraph extends Logging {
116116
// which may be nested inside of other clusters
117117
val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
118118
val rddClusters = rddScopes.map { scope =>
119-
val clusterId = scope.name + "_" + scope.id
120-
val clusterName = scope.name
119+
val clusterId = scope.id
120+
val clusterName = scope.name.replaceAll("\\n", "\\\\n")
121121
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
122122
}
123123
// Build the cluster hierarchy for this RDD
@@ -177,7 +177,7 @@ private[ui] object RDDOperationGraph extends Logging {
177177

178178
/** Return the dot representation of a node in an RDDOperationGraph. */
179179
private def makeDotNode(node: RDDOperationNode): String = {
180-
s"""${node.id} [label="${node.name} (${node.id})"]"""
180+
s"""${node.id} [label="${node.name} [${node.id}]"]"""
181181
}
182182

183183
/** Return the dot representation of a subgraph in an RDDOperationGraph. */

core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
2222
import org.apache.spark.{TaskContext, Partition, SparkContext}
2323

2424
/**
25-
*
25+
* Tests whether scopes are passed from the RDD operation to the RDDs correctly.
2626
*/
2727
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
2828
private var sc: SparkContext = null
2929
private val scope1 = new RDDOperationScope("scope1")
30-
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
31-
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
30+
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
31+
private val scope3 = new RDDOperationScope("scope3", Some(scope2))
3232

3333
before {
3434
sc = new SparkContext("local", "test")
@@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
4848
val scope1Json = scope1.toJson
4949
val scope2Json = scope2.toJson
5050
val scope3Json = scope3.toJson
51-
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
52-
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
53-
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
51+
assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
52+
assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
53+
assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
5454
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
5555
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
5656
assert(RDDOperationScope.fromJson(scope3Json) === scope3)

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
6565
val maxRetries = context.sparkContext.getConf.getInt(
6666
"spark.streaming.kafka.maxRetries", 1)
6767

68+
// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
69+
private[streaming] override def name: String = s"Kafka direct stream [$id]"
70+
6871
protected[streaming] override val checkpointData =
6972
new DirectKafkaInputDStreamCheckpointData
7073

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class KafkaReceiver[
135135
store((msgAndMetadata.key, msgAndMetadata.message))
136136
}
137137
} catch {
138-
case e: Throwable => logError("Error handling message; exiting", e)
138+
case e: Throwable => reportError("Error handling message; exiting", e)
139139
}
140140
}
141141
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ object KafkaUtils {
189189
sc: SparkContext,
190190
kafkaParams: Map[String, String],
191191
offsetRanges: Array[OffsetRange]
192-
): RDD[(K, V)] = {
192+
): RDD[(K, V)] = sc.withScope {
193193
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
194194
val leaders = leadersForRanges(kafkaParams, offsetRanges)
195195
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
@@ -224,7 +224,7 @@ object KafkaUtils {
224224
offsetRanges: Array[OffsetRange],
225225
leaders: Map[TopicAndPartition, Broker],
226226
messageHandler: MessageAndMetadata[K, V] => R
227-
): RDD[R] = {
227+
): RDD[R] = sc.withScope {
228228
val leaderMap = if (leaders.isEmpty) {
229229
leadersForRanges(kafkaParams, offsetRanges)
230230
} else {
@@ -233,7 +233,8 @@ object KafkaUtils {
233233
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
234234
}.toMap
235235
}
236-
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
236+
val cleanedHandler = sc.clean(messageHandler)
237+
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
237238
}
238239

239240
/**
@@ -256,7 +257,7 @@ object KafkaUtils {
256257
valueDecoderClass: Class[VD],
257258
kafkaParams: JMap[String, String],
258259
offsetRanges: Array[OffsetRange]
259-
): JavaPairRDD[K, V] = {
260+
): JavaPairRDD[K, V] = jsc.sc.withScope {
260261
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
261262
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
262263
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -294,7 +295,7 @@ object KafkaUtils {
294295
offsetRanges: Array[OffsetRange],
295296
leaders: JMap[TopicAndPartition, Broker],
296297
messageHandler: JFunction[MessageAndMetadata[K, V], R]
297-
): JavaRDD[R] = {
298+
): JavaRDD[R] = jsc.sc.withScope {
298299
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
299300
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
300301
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -348,8 +349,9 @@ object KafkaUtils {
348349
fromOffsets: Map[TopicAndPartition, Long],
349350
messageHandler: MessageAndMetadata[K, V] => R
350351
): InputDStream[R] = {
352+
val cleanedHandler = ssc.sc.clean(messageHandler)
351353
new DirectKafkaInputDStream[K, V, KD, VD, R](
352-
ssc, kafkaParams, fromOffsets, messageHandler)
354+
ssc, kafkaParams, fromOffsets, cleanedHandler)
353355
}
354356

355357
/**
@@ -469,11 +471,12 @@ object KafkaUtils {
469471
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
470472
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
471473
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
474+
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
472475
createDirectStream[K, V, KD, VD, R](
473476
jssc.ssc,
474477
Map(kafkaParams.toSeq: _*),
475478
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
476-
messageHandler.call _
479+
cleanedHandler
477480
)
478481
}
479482

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class ReliableKafkaReceiver[
267267
}
268268
} catch {
269269
case e: Exception =>
270-
logError("Error handling message", e)
270+
reportError("Error handling message", e)
271271
}
272272
}
273273
}

0 commit comments

Comments
 (0)