@@ -54,6 +54,9 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
54
54
55
55
private var appId : String = null
56
56
private var activeJobs : Set [Int ] = Set ()
57
+ private var executorEventId : Long = 0L
58
+
59
+ private var coresPerTask : Int = 1
57
60
58
61
override def onApplicationStart (event : SparkListenerApplicationStart ): Unit = {
59
62
assert(event.appId.isDefined, " Application without IDs are not supported." )
@@ -96,6 +99,8 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
96
99
details(" System Properties" ),
97
100
details(" Classpath Entries" ))
98
101
102
+ coresPerTask = envInfo.sparkProperties.toMap.get(" spark.task.cpus" ).map(_.toInt)
103
+ .getOrElse(coresPerTask)
99
104
kvstore.write(new ApplicationEnvironmentInfoWrapper (envInfo))
100
105
}
101
106
@@ -129,9 +134,12 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
129
134
hostPort = event.executorInfo.executorHost,
130
135
isActive = true ,
131
136
totalCores = event.executorInfo.totalCores,
137
+ maxTasks = event.executorInfo.totalCores / coresPerTask,
132
138
executorLogs = event.executorInfo.logUrlMap)
133
139
new ExecutorSummaryWrapper (newInfo)
134
140
}
141
+
142
+ writeExecutorEvent(event)
135
143
}
136
144
137
145
override def onExecutorRemoved (event : SparkListenerExecutorRemoved ): Unit = {
@@ -140,12 +148,19 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
140
148
isActive = false )
141
149
new ExecutorSummaryWrapper (newInfo)
142
150
}
151
+
152
+ writeExecutorEvent(event)
143
153
}
144
154
145
155
override def onExecutorBlacklisted (event : SparkListenerExecutorBlacklisted ): Unit = {
146
156
updateBlackListStatus(event.executorId, true )
147
157
}
148
158
159
+ private def writeExecutorEvent (event : SparkListenerEvent ): Unit = {
160
+ executorEventId += 1
161
+ kvstore.write(new ExecutorEventData (executorEventId, event))
162
+ }
163
+
149
164
override def onExecutorUnblacklisted (event : SparkListenerExecutorUnblacklisted ): Unit = {
150
165
updateBlackListStatus(event.executorId, false )
151
166
}
@@ -342,6 +357,14 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
342
357
numActiveTasks = stage.info.numActiveTasks + 1 )
343
358
newStageDataWrapper(stage, newInfo)
344
359
}
360
+
361
+ updateExecutorSummary(event.taskInfo.executorId) { uiexec =>
362
+ val updated = newExecutorSummary(
363
+ uiexec.info,
364
+ activeTasks = uiexec.info.activeTasks + 1 ,
365
+ totalTasks = uiexec.info.totalTasks + 1 )
366
+ new ExecutorSummaryWrapper (updated)
367
+ }
345
368
}
346
369
347
370
override def onTaskGettingResult (event : SparkListenerTaskGettingResult ): Unit = {
@@ -440,6 +463,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
440
463
newExecutorStageSummary(
441
464
exec,
442
465
failedTasks = exec.failedTasks + failedDelta,
466
+ succeededTasks = exec.succeededTasks + completedDelta,
443
467
inputBytes = exec.inputBytes + metricsDelta.inputMetrics.bytesRead,
444
468
outputBytes = exec.outputBytes + metricsDelta.outputMetrics.bytesWritten,
445
469
shuffleRead = exec.shuffleRead + metricsDelta.shuffleReadMetrics.localBytesRead +
@@ -448,6 +472,30 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
448
472
memoryBytesSpilled = exec.memoryBytesSpilled + metricsDelta.memoryBytesSpilled,
449
473
diskBytesSpilled = exec.diskBytesSpilled + metricsDelta.diskBytesSpilled)
450
474
}
475
+
476
+ updateExecutorSummary(event.taskInfo.executorId) { uiexec =>
477
+ val (gcTime, inputBytes, shuffleRead, shuffleWrite) = if (event.taskMetrics != null ) {
478
+ val readMetrics = event.taskMetrics.shuffleReadMetrics
479
+ (event.taskMetrics.jvmGCTime,
480
+ event.taskMetrics.inputMetrics.bytesRead,
481
+ readMetrics.localBytesRead + readMetrics.remoteBytesRead,
482
+ event.taskMetrics.shuffleWriteMetrics.bytesWritten)
483
+ } else {
484
+ (0L , 0L , 0L , 0L )
485
+ }
486
+
487
+ val updated = newExecutorSummary(
488
+ uiexec.info,
489
+ activeTasks = uiexec.info.activeTasks - 1 ,
490
+ completedTasks = uiexec.info.completedTasks + completedDelta,
491
+ failedTasks = uiexec.info.failedTasks + failedDelta,
492
+ totalDuration = uiexec.info.totalDuration + event.taskInfo.duration,
493
+ totalGCTime = uiexec.info.totalGCTime + gcTime,
494
+ totalInputBytes = uiexec.info.totalInputBytes + inputBytes,
495
+ totalShuffleRead = uiexec.info.totalShuffleRead + shuffleRead,
496
+ totalShuffleWrite = uiexec.info.totalShuffleWrite + shuffleWrite)
497
+ new ExecutorSummaryWrapper (updated)
498
+ }
451
499
}
452
500
453
501
override def onStageCompleted (event : SparkListenerStageCompleted ): Unit = {
@@ -486,10 +534,24 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
486
534
}
487
535
488
536
override def onBlockManagerAdded (event : SparkListenerBlockManagerAdded ): Unit = {
537
+ // This needs to set fields that are already set by onExecutorAdded because the driver is
538
+ // considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.
489
539
updateExecutorSummary(event.blockManagerId.executorId) { exec =>
540
+ // Only create memory metrics if the event has the info; this avoid setting bogus values
541
+ // when replaying old application logs.
542
+ val memMetrics = event.maxOnHeapMem.map { _ =>
543
+ newMemoryMetrics(
544
+ exec.info.memoryMetrics,
545
+ totalOnHeapStorageMemory = event.maxOnHeapMem,
546
+ totalOffHeapStorageMemory = event.maxOffHeapMem)
547
+ }
548
+
490
549
val updated = newExecutorSummary(
491
550
exec.info,
492
- maxMemory = event.maxMem)
551
+ hostPort = event.blockManagerId.hostPort,
552
+ isActive = true ,
553
+ maxMemory = event.maxMem,
554
+ memoryMetrics = memMetrics)
493
555
new ExecutorSummaryWrapper (updated)
494
556
}
495
557
}
@@ -566,6 +628,12 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
566
628
// Function to apply a delta to a value, but ensure that it doesn't go negative.
567
629
def newValue (old : Long , delta : Long ): Long = math.max(0 , old + delta)
568
630
631
+ // Function to calculate the remaining memory after applying a delta. Assumes that if
632
+ // one value is provided, all the values are.
633
+ def remainingMemory (max : Option [Long ], oldv : Option [Long ], newv : Option [Long ]): Option [Long ] = {
634
+ max.map { m => newValue(m, oldv.get - newv.get) }
635
+ }
636
+
569
637
// If the storage level is NONE, then don't update the storage level of existing information.
570
638
val updatedStorageLevel = if (storageLevel.useMemory || storageLevel.useDisk) {
571
639
Some (storageLevel.description)
@@ -579,6 +647,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
579
647
new ExecutorSummaryWrapper (newExecutorSummary(None , id = executorId)))
580
648
581
649
var rddBlocksDelta = 0
650
+ val memMetrics = executorInfo.info.memoryMetrics
582
651
583
652
// Update the block entry in the RDD info, keeping track of the deltas above so that we
584
653
// can update the executor information too.
@@ -634,12 +703,31 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
634
703
val newDistMem = newValue(oldDist.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult)
635
704
val newDistDisk = newValue(oldDist.diskUsed, event.blockUpdatedInfo.diskSize * diskMult)
636
705
val newDists = if (newDistMem > 0 || newDistDisk > 0 ) {
706
+ val newOffHeap = if (storageLevel.useOffHeap) Some (newDistMem) else None
707
+ val newOnHeap = if (! storageLevel.useOffHeap) Some (newDistMem) else None
708
+ val remainingOffHeap = if (storageLevel.useOffHeap) {
709
+ remainingMemory(memMetrics.map(_.totalOffHeapStorageMemory), oldDist.offHeapMemoryUsed,
710
+ newOffHeap)
711
+ } else {
712
+ None
713
+ }
714
+ val remainingOnHeap = if (! storageLevel.useOffHeap) {
715
+ remainingMemory(memMetrics.map(_.totalOnHeapStorageMemory), oldDist.onHeapMemoryUsed,
716
+ newOnHeap)
717
+ } else {
718
+ None
719
+ }
720
+
637
721
val newDist = newRDDDataDistribution(
638
722
oldDist,
639
723
memoryUsed = newDistMem,
640
724
memoryRemaining = newValue(oldDist.memoryRemaining,
641
725
event.blockUpdatedInfo.memSize * memoryMult * - 1 ),
642
- diskUsed = newDistDisk)
726
+ diskUsed = newDistDisk,
727
+ onHeapMemoryUsed = newOnHeap,
728
+ offHeapMemoryUsed = newOffHeap,
729
+ onHeapMemoryRemaining = remainingOnHeap,
730
+ offHeapMemoryRemaining = remainingOffHeap)
643
731
Seq (newDist)
644
732
} else {
645
733
Nil
@@ -656,13 +744,30 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
656
744
}
657
745
658
746
// Update the ExecutorSummary for the block's manager.
747
+ val updatedMemMetrics = memMetrics.map { m =>
748
+ val onHeapUsed = if (! storageLevel.useOffHeap) {
749
+ newValue(m.usedOnHeapStorageMemory, event.blockUpdatedInfo.memSize * memoryMult)
750
+ } else {
751
+ m.usedOnHeapStorageMemory
752
+ }
753
+ val offHeapUsed = if (storageLevel.useOffHeap) {
754
+ newValue(m.usedOffHeapStorageMemory, event.blockUpdatedInfo.memSize * memoryMult)
755
+ } else {
756
+ m.usedOffHeapStorageMemory
757
+ }
758
+ newMemoryMetrics(m,
759
+ usedOnHeapStorageMemory = onHeapUsed,
760
+ usedOffHeapStorageMemory = offHeapUsed)
761
+ }
762
+
659
763
val newExecSummary = newExecutorSummary(
660
764
executorInfo.info,
661
765
rddBlocks = newValue(executorInfo.info.rddBlocks, rddBlocksDelta).toInt,
662
766
memoryUsed = newValue(executorInfo.info.memoryUsed,
663
767
event.blockUpdatedInfo.memSize * memoryMult),
664
768
diskUsed = newValue(executorInfo.info.diskUsed,
665
- event.blockUpdatedInfo.diskSize * diskMult))
769
+ event.blockUpdatedInfo.diskSize * diskMult),
770
+ memoryMetrics = updatedMemMetrics)
666
771
kvstore.write(new ExecutorSummaryWrapper (newExecSummary))
667
772
}
668
773
@@ -819,6 +924,19 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
819
924
option(memoryMetrics, old.map(_.memoryMetrics)))
820
925
}
821
926
927
+ private def newMemoryMetrics (
928
+ old : Option [v1.MemoryMetrics ],
929
+ usedOnHeapStorageMemory : Option [Long ] = None ,
930
+ usedOffHeapStorageMemory : Option [Long ] = None ,
931
+ totalOnHeapStorageMemory : Option [Long ] = None ,
932
+ totalOffHeapStorageMemory : Option [Long ] = None ): v1.MemoryMetrics = {
933
+ new v1.MemoryMetrics (
934
+ value(usedOnHeapStorageMemory, old.map(_.usedOnHeapStorageMemory), 0L ),
935
+ value(usedOffHeapStorageMemory, old.map(_.usedOffHeapStorageMemory), 0L ),
936
+ value(totalOnHeapStorageMemory, old.map(_.totalOnHeapStorageMemory), 0L ),
937
+ value(totalOffHeapStorageMemory, old.map(_.totalOffHeapStorageMemory), 0L ))
938
+ }
939
+
822
940
private def newJobData (
823
941
old : Option [v1.JobData ],
824
942
jobId : Option [Int ] = None ,
0 commit comments