@@ -20,7 +20,6 @@ package org.apache.spark.ui.jobs
20
20
import java .util .Date
21
21
import javax .servlet .http .HttpServletRequest
22
22
23
- import scala .collection .mutable .HashSet
24
23
import scala .xml .{Elem , Node , Unparsed }
25
24
26
25
import org .apache .commons .lang3 .StringEscapeUtils
@@ -209,12 +208,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
209
208
210
209
val unzipped = taskHeadersAndCssClasses.unzip
211
210
212
- val currentTime = System .currentTimeMillis()
213
211
val taskTable = UIUtils .listingTable(
214
212
unzipped._1,
215
213
taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
216
- stageData.hasShuffleRead, stageData.hasShuffleWrite,
217
- stageData.hasBytesSpilled, currentTime),
214
+ stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
218
215
tasks,
219
216
headerClasses = unzipped._2)
220
217
// Excludes tasks which failed and have incomplete metrics
@@ -434,219 +431,28 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
434
431
val maybeAccumulableTable : Seq [Node ] =
435
432
if (accumulables.size > 0 ) { <h4 >Accumulators </h4 > ++ accumulableTable } else Seq ()
436
433
437
- val executorsSet = new HashSet [(String , String )]
438
-
439
- var minLaunchTime = Long .MaxValue
440
- var maxFinishTime = Long .MinValue
441
- var numEffectiveTasks = 0
442
- val executorsArrayStr = stageData.taskData.flatMap {
443
- case (_, taskUIData) =>
444
- val taskInfo = taskUIData.taskInfo
445
-
446
- val executorId = taskInfo.executorId
447
- val host = taskInfo.host
448
- executorsSet += ((executorId, host))
449
-
450
- val taskId = taskInfo.taskId
451
- val taskIdWithIndexAndAttempt = s " Task ${taskId}( ${taskInfo.id}) "
452
-
453
- val isSucceeded = taskInfo.successful
454
- val isFailed = taskInfo.failed
455
- val isRunning = taskInfo.running
456
- val classNameByStatus = {
457
- if (isSucceeded) {
458
- " succeeded"
459
- } else if (isFailed) {
460
- " failed"
461
- } else if (isRunning) {
462
- " running"
463
- }
464
- }
465
-
466
- if (isSucceeded || isRunning || isFailed) {
467
- val launchTime = taskInfo.launchTime
468
- val finishTime = if (! isRunning) taskInfo.finishTime else currentTime
469
- val totalExecutionTime = finishTime - launchTime
470
- minLaunchTime = launchTime.min(minLaunchTime)
471
- maxFinishTime = launchTime.max(maxFinishTime)
472
- numEffectiveTasks += 1
473
-
474
- val metricsOpt = taskUIData.taskMetrics
475
- val shuffleReadTime =
476
- metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L ).toDouble
477
- val shuffleReadTimeProportion =
478
- (shuffleReadTime / totalExecutionTime * 100 ).toLong
479
- val shuffleWriteTime =
480
- metricsOpt.flatMap(_.shuffleWriteMetrics.map(_.shuffleWriteTime)).getOrElse(0L ) / 1e6
481
- val shuffleWriteTimeProportion =
482
- (shuffleWriteTime / totalExecutionTime * 100 ).toLong
483
- val executorRuntimeProportion =
484
- ((metricsOpt.map(_.executorRunTime).getOrElse(0L ) -
485
- shuffleReadTime - shuffleWriteTime) / totalExecutionTime * 100 ).toLong
486
- val serializationTimeProportion =
487
- (metricsOpt.map(_.resultSerializationTime).getOrElse(0L ).toDouble /
488
- totalExecutionTime * 100 ).toLong
489
- val deserializationTimeProportion =
490
- (metricsOpt.map(_.executorDeserializeTime).getOrElse(0L ).toDouble /
491
- totalExecutionTime * 100 ).toLong
492
- val gettingResultTimeProportion =
493
- (getGettingResultTime(taskUIData.taskInfo).toDouble / totalExecutionTime * 100 ).toLong
494
- val schedulerDelayProportion =
495
- 100 - executorRuntimeProportion - shuffleReadTimeProportion -
496
- shuffleWriteTimeProportion - serializationTimeProportion -
497
- deserializationTimeProportion - gettingResultTimeProportion
498
-
499
- val schedulerDelayProportionPos = 0
500
- val deserializationTimeProportionPos =
501
- schedulerDelayProportionPos + schedulerDelayProportion
502
- val shuffleReadTimeProportionPos =
503
- deserializationTimeProportionPos + deserializationTimeProportion
504
- val executorRuntimeProportionPos =
505
- shuffleReadTimeProportionPos + shuffleReadTimeProportion
506
- val shuffleWriteTimeProportionPos =
507
- executorRuntimeProportionPos + executorRuntimeProportion
508
- val serializationTimeProportionPos =
509
- shuffleWriteTimeProportionPos + shuffleWriteTimeProportion
510
- val gettingResultTimeProportionPos =
511
- serializationTimeProportionPos + serializationTimeProportion
512
-
513
- val timelineObject =
514
- s """
515
- |{
516
- | 'className': 'task task-assignment-timeline-object ${classNameByStatus}',
517
- | 'group': ' ${executorId}',
518
- | 'content': '<div class="task-assignment-timeline-content">' +
519
- | ' ${taskIdWithIndexAndAttempt}</div>' +
520
- | '<svg class="task-assignment-timeline-duration-bar">' +
521
- | '<rect x=" ${schedulerDelayProportionPos}%" y="0" height="100%"' +
522
- | 'width=" ${schedulerDelayProportion}%" fill="#F6D76B"></rect>' +
523
- | '<rect x=" ${deserializationTimeProportionPos}%" y="0" height="100%"' +
524
- | 'width=" ${deserializationTimeProportion}%" fill="#FFBDD8"></rect>' +
525
- | '<rect x=" ${shuffleReadTimeProportionPos}%" y="0" height="100%"' +
526
- | 'width=" ${shuffleReadTimeProportion}%" fill="#8AC7DE"></rect>' +
527
- | '<rect x=" ${executorRuntimeProportionPos}%" y="0" height="100%"' +
528
- | 'width=" ${executorRuntimeProportion}%" fill="#D9EB52"></rect>' +
529
- | '<rect x=" ${shuffleWriteTimeProportionPos}%" y="0" height="100%"' +
530
- | 'width=" ${shuffleWriteTimeProportion}%" fill="#87796F"></rect>' +
531
- | '<rect x=" ${serializationTimeProportionPos}%" y="0" height="100%"' +
532
- | 'width=" ${serializationTimeProportion}%" fill="#93DFB8"></rect>' +
533
- | '<rect x=" ${gettingResultTimeProportionPos}%" y="0" height="100%"' +
534
- | 'width=" ${gettingResultTimeProportion}%" fill="#FF9036"></rect></svg>',
535
- | 'start': new Date( ${launchTime}),
536
- | 'end': new Date( ${finishTime}),
537
- | 'title': ' ${taskIdWithIndexAndAttempt}\\ nStatus: ${taskInfo.status}\\ n' +
538
- | 'Launch Time: ${UIUtils .formatDate(new Date (launchTime))}' +
539
- | ' ${
540
- if (! isRunning) {
541
- s """ \\ nFinish Time: ${UIUtils .formatDate(new Date (finishTime))}"""
542
- } else {
543
- " "
544
- }
545
- }'
546
- |}
547
- """ .stripMargin
548
- Option (timelineObject)
549
- } else {
550
- None
551
- }
552
- }.mkString(" [" , " ," , " ]" )
553
-
554
- val groupArrayStr = executorsSet.map {
555
- case (executorId, host) =>
556
- s """
557
- |{
558
- | 'id': ' ${executorId}',
559
- | 'content': ' ${executorId} / ${host}',
560
- |}
561
- """ .stripMargin
562
- }.mkString(" [" , " ," , " ]" )
563
-
564
- var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0 ).round
565
- if (maxWindowInSec <= 0 ) maxWindowInSec = 1
566
- val tasksPerSecond = numEffectiveTasks / maxWindowInSec
567
- var maxZoom = {
568
- if (tasksPerSecond > 100 ) {
569
- 1000L / (tasksPerSecond / 100 )
570
- }
571
- else {
572
- 24L * 60 * 60 * 1000
573
- }
574
- }
575
-
576
- if (maxZoom < 0 ) maxZoom = 1
577
-
578
434
val content =
579
435
summary ++
580
436
showAdditionalMetrics ++
581
437
<h4 >Summary Metrics for {numCompleted} Completed Tasks </h4 > ++
582
438
<div >{summaryTable.getOrElse(" No tasks have reported metrics yet." )}</div > ++
583
439
<h4 >Aggregated Metrics by Executor </h4 > ++ executorTable.toNodeSeq ++
584
440
maybeAccumulableTable ++
585
- <h4 >Tasks </h4 > ++ taskTable ++
586
- <h4 >Task Assignment Timeline </h4 > ++
587
- <div id =" task-assignment-timeline" >
588
- <div class =" timeline-header" >
589
- {taskAssignmentTimelineControlPanel ++ taskAssignmentTimelineLegend}
590
- </div >
591
- </div > ++
592
- <script type =" text/javascript" >
593
- {Unparsed (s " drawTaskAssignmentTimeline( " +
594
- s " ${groupArrayStr}, ${executorsArrayStr}, ${minLaunchTime}, ${maxZoom}) " )}
595
- </script >
441
+ <h4 >Tasks </h4 > ++ taskTable
596
442
597
443
UIUtils .headerSparkPage(" Details for Stage %d" .format(stageId), content, parent)
598
444
}
599
445
}
600
446
601
- private val taskAssignmentTimelineControlPanel : Seq [Node ] = {
602
- <div class =" control-panel" >
603
- <div id =" task-assignment-timeline-zoom-lock" >
604
- <input type =" checkbox" checked =" checked" ></input >
605
- <span >Zoom Lock </span >
606
- </div >
607
- </div >
608
- }
609
-
610
- private val taskAssignmentTimelineLegend : Seq [Node ] = {
611
- <div class =" legend-area" >
612
- <svg >
613
- < rect x= " 5px" y= " 5px" width= " 20px"
614
- height= " 15px" rx= " 2px" fill= " #D5DDF6" stroke= " #97B0F8" ></ rect>
615
- <text x =" 35px" y =" 17px" >Succeeded Task </text >
616
- < rect x= " 215px" y= " 5px" width= " 20px"
617
- height= " 15px" rx= " 2px" fill= " #FF5475" stroke= " #97B0F8" ></ rect>
618
- <text x =" 245px" y =" 17px" >Failed Task </text >
619
- < rect x= " 425px" y= " 5px" width= " 20px"
620
- height= " 15px" rx= " 2px" fill= " #FDFFCA" stroke= " #97B0F8" ></ rect>
621
- <text x =" 455px" y =" 17px" >Running Task </text >
622
- {
623
- val legendPairs = List ((" #FFBDD8" , " Task Deserialization Time" ),
624
- (" #8AC7DE" , " Shuffle Read Time" ), (" #D9EB52" , " Executor Computing Time" ),
625
- (" #87796F" , " Shuffle Write Time" ), (" #93DFB8" , " Result Serialization TIme" ),
626
- (" #FF9036" , " Getting Result Time" ), (" #F6D76B" , " Scheduler Delay" ))
627
-
628
- legendPairs.zipWithIndex.map {
629
- case ((color, name), index) =>
630
- < rect x= {5 + (index / 3 ) * 210 + " px" } y= {35 + (index % 3 ) * 15 + " px" }
631
- width= " 10px" height= " 10px" fill= {color}></ rect>
632
- < text x= {25 + (index / 3 ) * 210 + " px" }
633
- y= {45 + (index % 3 ) * 15 + " px" }> {name}</text >
634
- }
635
- }
636
- </svg >
637
- </div >
638
- }
639
-
640
447
def taskRow (
641
448
hasAccumulators : Boolean ,
642
449
hasInput : Boolean ,
643
450
hasOutput : Boolean ,
644
451
hasShuffleRead : Boolean ,
645
452
hasShuffleWrite : Boolean ,
646
- hasBytesSpilled : Boolean ,
647
- currentTime : Long )(taskData : TaskUIData ): Seq [Node ] = {
453
+ hasBytesSpilled : Boolean )(taskData : TaskUIData ): Seq [Node ] = {
648
454
taskData match { case TaskUIData (info, metrics, errorMessage) =>
649
- val duration = if (info.status == " RUNNING" ) info.timeRunning(currentTime )
455
+ val duration = if (info.status == " RUNNING" ) info.timeRunning(System .currentTimeMillis() )
650
456
else metrics.map(_.executorRunTime).getOrElse(1L )
651
457
val formatDuration = if (info.status == " RUNNING" ) UIUtils .formatDuration(duration)
652
458
else metrics.map(m => UIUtils .formatDuration(m.executorRunTime)).getOrElse(" " )
0 commit comments