Skip to content

Commit 078ca72

Browse files
committed
Add executor log url to Executors page on Yarn
1 parent 5aa0f21 commit 078ca72

File tree

11 files changed

+71
-24
lines changed

11 files changed

+71
-24
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend(
3838
executorId: String,
3939
hostPort: String,
4040
cores: Int,
41-
env: SparkEnv)
41+
env: SparkEnv,
42+
executorLogUrl: String)
4243
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
4344

4445
Utils.checkHostPort(hostPort, "Expected hostport")
@@ -57,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5758
case RegisteredExecutor =>
5859
logInfo("Successfully registered with driver")
5960
val (hostname, _) = Utils.parseHostPort(hostPort)
60-
executor = new Executor(executorId, hostname, env, isLocal = false)
61+
executor = new Executor(executorId, hostname, env, isLocal = false, executorLogUrl)
6162

6263
case RegisterExecutorFailed(message) =>
6364
logError("Slave registration failed: " + message)
@@ -111,7 +112,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
111112
hostname: String,
112113
cores: Int,
113114
appId: String,
114-
workerUrl: Option[String]) {
115+
workerUrl: Option[String],
116+
executorLogUrl: String) {
115117

116118
SignalLogger.register(log)
117119

@@ -156,7 +158,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
156158
val sparkHostPort = hostname + ":" + boundPort
157159
env.actorSystem.actorOf(
158160
Props(classOf[CoarseGrainedExecutorBackend],
159-
driverUrl, executorId, sparkHostPort, cores, env),
161+
driverUrl, executorId, sparkHostPort, cores, env, executorLogUrl),
160162
name = "Executor")
161163
workerUrl.foreach { url =>
162164
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
@@ -171,15 +173,17 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
171173
System.err.println(
172174
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
173175
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
174-
"<cores> <appid> [<workerUrl>] ")
176+
"<cores> <appid> [<workerUrl>] [executorLogUrl <executorLogUrl>]")
175177
System.exit(1)
176178

177179
// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
178180
// and CoarseMesosSchedulerBackend (for mesos mode).
179181
case 5 =>
180-
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
182+
run(args(0), args(1), args(2), args(3).toInt, args(4), None, "")
183+
case 7 =>
184+
run(args(0), args(1), args(2), args(3).toInt, args(4), None, args(6))
181185
case x if x > 5 =>
182-
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
186+
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), "")
183187
}
184188
}
185189
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ private[spark] class Executor(
4343
executorId: String,
4444
executorHostname: String,
4545
env: SparkEnv,
46-
isLocal: Boolean = false)
46+
isLocal: Boolean = false,
47+
executorLogUrl: String = "")
4748
extends Logging
4849
{
4950

@@ -79,6 +80,7 @@ private[spark] class Executor(
7980

8081
if (!isLocal) {
8182
env.metricsSystem.registerSource(executorSource)
83+
env.blockManager.setExecutorLogUrl(executorLogUrl)
8284
env.blockManager.initialize(conf.getAppId)
8385
}
8486

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
8080
extends SparkListenerEvent
8181

8282
@DeveloperApi
83-
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
83+
case class SparkListenerBlockManagerAdded(
84+
time: Long,
85+
blockManagerId: BlockManagerId,
86+
maxMem: Long,
87+
executorLogUrl: String = "")
8488
extends SparkListenerEvent
8589

8690
@DeveloperApi

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ private[spark] class BlockManager(
154154
@volatile private var cachedPeers: Seq[BlockManagerId] = _
155155
private val peerFetchLock = new Object
156156
private var lastPeerFetchTime = 0L
157+
private var executorLogUrl = ""
157158

158159
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
159160
* the initialization of the compression codec until it is first used. The reason is that a Spark
@@ -202,7 +203,7 @@ private[spark] class BlockManager(
202203
blockManagerId
203204
}
204205

205-
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
206+
master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl)
206207

207208
// Register Executors' configuration with the local shuffle service, if one should exist.
208209
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
@@ -256,6 +257,10 @@ private[spark] class BlockManager(
256257
}
257258
}
258259

260+
def setExecutorLogUrl(url: String): Unit = {
261+
executorLogUrl = url
262+
}
263+
259264
/**
260265
* Re-register with the master and report all blocks to it. This will be called by the heart beat
261266
* thread if our heartbeat to the block manager indicates that we were not registered.
@@ -265,7 +270,7 @@ private[spark] class BlockManager(
265270
def reregister(): Unit = {
266271
// TODO: We might need to rate limit re-registering.
267272
logInfo("BlockManager re-registering with master")
268-
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
273+
master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl)
269274
reportAllBlocks()
270275
}
271276

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ class BlockManagerMaster(
4646
}
4747

4848
/** Register the BlockManager's id with the driver. */
49-
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
49+
def registerBlockManager(
50+
blockManagerId: BlockManagerId,
51+
maxMemSize: Long,
52+
slaveActor: ActorRef,
53+
executorLogUrl: String) {
5054
logInfo("Trying to register BlockManager")
51-
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
55+
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executorLogUrl))
5256
logInfo("Registered BlockManager")
5357
}
5458

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
6666
}
6767

6868
override def receiveWithLogging = {
69-
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
70-
register(blockManagerId, maxMemSize, slaveActor)
69+
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executoLogUrl) =>
70+
register(blockManagerId, maxMemSize, slaveActor, executoLogUrl)
7171
sender ! true
7272

7373
case UpdateBlockInfo(
@@ -325,7 +325,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
325325
).map(_.flatten.toSeq)
326326
}
327327

328-
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
328+
private def register(
329+
id: BlockManagerId,
330+
maxMemSize: Long,
331+
slaveActor: ActorRef,
332+
executorLogUrl: String) {
329333
val time = System.currentTimeMillis()
330334
if (!blockManagerInfo.contains(id)) {
331335
blockManagerIdByExecutor.get(id.executorId) match {
@@ -344,7 +348,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
344348
blockManagerInfo(id) = new BlockManagerInfo(
345349
id, System.currentTimeMillis(), maxMemSize, slaveActor)
346350
}
347-
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
351+
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize, executorLogUrl))
348352
}
349353

350354
private def updateBlockInfo(

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ private[spark] object BlockManagerMessages {
5252
case class RegisterBlockManager(
5353
blockManagerId: BlockManagerId,
5454
maxMemSize: Long,
55-
sender: ActorRef)
55+
sender: ActorRef,
56+
executoLogUrl: String)
5657
extends ToBlockManagerMaster
5758

5859
case class UpdateBlockInfo(

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class StorageStatusListener extends SparkListener {
7676
val blockManagerId = blockManagerAdded.blockManagerId
7777
val executorId = blockManagerId.executorId
7878
val maxMem = blockManagerAdded.maxMem
79-
val storageStatus = new StorageStatus(blockManagerId, maxMem)
79+
val executorLogUrl = blockManagerAdded.executorLogUrl
80+
val storageStatus = new StorageStatus(blockManagerId, maxMem, executorLogUrl)
8081
executorIdToStorageStatus(executorId) = storageStatus
8182
}
8283
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
3030
* class cannot mutate the source of the information. Accesses are not thread-safe.
3131
*/
3232
@DeveloperApi
33-
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
33+
class StorageStatus(
34+
val blockManagerId: BlockManagerId,
35+
val maxMem: Long,
36+
val executorLogUrl: String = "") {
3437

3538
/**
3639
* Internal representation of the blocks stored in this block manager.

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ private case class ExecutorSummaryInfo(
4040
totalInputBytes: Long,
4141
totalShuffleRead: Long,
4242
totalShuffleWrite: Long,
43-
maxMemory: Long)
43+
maxMemory: Long,
44+
executorLogUrl: String)
4445

4546
private[ui] class ExecutorsPage(
4647
parent: ExecutorsTab,
@@ -80,6 +81,7 @@ private[ui] class ExecutorsPage(
8081
</span>
8182
</th>
8283
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
84+
<th>Log</th>
8385
</thead>
8486
<tbody>
8587
{execInfoSorted.map(execRow)}
@@ -148,6 +150,9 @@ private[ui] class ExecutorsPage(
148150
Seq.empty
149151
}
150152
}
153+
<td>
154+
<a href={info.executorLogUrl}>Log</a>
155+
</td>
151156
</tr>
152157
}
153158

@@ -168,6 +173,7 @@ private[ui] class ExecutorsPage(
168173
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
169174
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
170175
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
176+
val executorLogUrl = status.executorLogUrl
171177

172178
new ExecutorSummaryInfo(
173179
execId,
@@ -183,7 +189,8 @@ private[ui] class ExecutorsPage(
183189
totalInputBytes,
184190
totalShuffleRead,
185191
totalShuffleWrite,
186-
maxMem
192+
maxMem,
193+
executorLogUrl
187194
)
188195
}
189196
}

0 commit comments

Comments
 (0)