Skip to content

Commit 96c2c71

Browse files
Kostas Sakellispwendell
authored andcommitted
[SPARK-4857] [CORE] Adds Executor membership events to SparkListener
Adds onExecutorAdded and onExecutorRemoved events to the SparkListener. This will allow a client to get notified when an executor has been added/removed and provide additional information such as how many vcores it is consuming. In addition, this commit adds a SparkListenerAdapter to the Java API that provides default implementations to the SparkListener. This is to get around the fact that default implementations for traits don't work in Java. Having Java clients extend SparkListenerAdapter moving forward will prevent breakage in java when we add new events to SparkListener. Author: Kostas Sakellis <[email protected]> Closes #3711 from ksakellis/kostas-spark-4857 and squashes the following commits: 946d2c5 [Kostas Sakellis] Added executorAdded/Removed events to MesosSchedulerBackend b1d054a [Kostas Sakellis] Remove executorInfo from ExecutorRemoved event 1727b38 [Kostas Sakellis] Renamed ExecutorDetails back to ExecutorInfo and other CR feedback 14fe78d [Kostas Sakellis] Added executor added/removed events to json protocol 93d087b [Kostas Sakellis] [SPARK-4857] [CORE] Adds Executor membership events to SparkListener
1 parent 65858ba commit 96c2c71

18 files changed

+375
-33
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
import org.apache.spark.scheduler.SparkListener;
21+
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
22+
import org.apache.spark.scheduler.SparkListenerApplicationStart;
23+
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
24+
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
25+
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
26+
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
27+
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
28+
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
29+
import org.apache.spark.scheduler.SparkListenerJobEnd;
30+
import org.apache.spark.scheduler.SparkListenerJobStart;
31+
import org.apache.spark.scheduler.SparkListenerStageCompleted;
32+
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
33+
import org.apache.spark.scheduler.SparkListenerTaskEnd;
34+
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
35+
import org.apache.spark.scheduler.SparkListenerTaskStart;
36+
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
37+
38+
/**
39+
* Java clients should extend this class instead of implementing
40+
* SparkListener directly. This is to prevent java clients
41+
* from breaking when new events are added to the SparkListener
42+
* trait.
43+
*
44+
* This is a concrete class instead of abstract to enforce
45+
* new events get added to both the SparkListener and this adapter
46+
* in lockstep.
47+
*/
48+
public class JavaSparkListener implements SparkListener {
49+
50+
@Override
51+
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
52+
53+
@Override
54+
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
55+
56+
@Override
57+
public void onTaskStart(SparkListenerTaskStart taskStart) { }
58+
59+
@Override
60+
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
61+
62+
@Override
63+
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
64+
65+
@Override
66+
public void onJobStart(SparkListenerJobStart jobStart) { }
67+
68+
@Override
69+
public void onJobEnd(SparkListenerJobEnd jobEnd) { }
70+
71+
@Override
72+
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
73+
74+
@Override
75+
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
76+
77+
@Override
78+
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
79+
80+
@Override
81+
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
82+
83+
@Override
84+
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
85+
86+
@Override
87+
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
88+
89+
@Override
90+
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
91+
92+
@Override
93+
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
94+
95+
@Override
96+
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
97+
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
3838
extends Serializable {
3939

4040
@transient var state: ApplicationState.Value = _
41-
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
42-
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
41+
@transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
42+
@transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
4343
@transient var coresGranted: Int = _
4444
@transient var endTime: Long = _
4545
@transient var appSource: ApplicationSource = _
@@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(
5555

5656
private def init() {
5757
state = ApplicationState.WAITING
58-
executors = new mutable.HashMap[Int, ExecutorInfo]
58+
executors = new mutable.HashMap[Int, ExecutorDesc]
5959
coresGranted = 0
6060
endTime = -1L
6161
appSource = new ApplicationSource(this)
6262
nextExecutorId = 0
63-
removedExecutors = new ArrayBuffer[ExecutorInfo]
63+
removedExecutors = new ArrayBuffer[ExecutorDesc]
6464
}
6565

6666
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
7575
}
7676
}
7777

78-
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
79-
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
78+
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
79+
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
8080
executors(exec.id) = exec
8181
coresGranted += cores
8282
exec
8383
}
8484

85-
def removeExecutor(exec: ExecutorInfo) {
85+
def removeExecutor(exec: ExecutorDesc) {
8686
if (executors.contains(exec.id)) {
8787
removedExecutors += executors(exec.id)
8888
executors -= exec.id

core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala renamed to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
1919

2020
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
2121

22-
private[spark] class ExecutorInfo(
22+
private[spark] class ExecutorDesc(
2323
val id: Int,
2424
val application: ApplicationInfo,
2525
val worker: WorkerInfo,
@@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(
3737

3838
override def equals(other: Any): Boolean = {
3939
other match {
40-
case info: ExecutorInfo =>
40+
case info: ExecutorDesc =>
4141
fullId == info.fullId &&
4242
worker.id == info.worker.id &&
4343
cores == info.cores &&

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ private[spark] class Master(
581581
}
582582
}
583583

584-
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
584+
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
585585
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
586586
worker.addExecutor(exec)
587587
worker.actor ! LaunchExecutor(masterUrl,

core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
3838
Utils.checkHost(host, "Expected hostname")
3939
assert (port > 0)
4040

41-
@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
41+
@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
4242
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
4343
@transient var state: WorkerState.Value = _
4444
@transient var coresUsed: Int = _
@@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
7070
host + ":" + port
7171
}
7272

73-
def addExecutor(exec: ExecutorInfo) {
73+
def addExecutor(exec: ExecutorDesc) {
7474
executors(exec.fullId) = exec
7575
coresUsed += exec.cores
7676
memoryUsed += exec.memory
7777
}
7878

79-
def removeExecutor(exec: ExecutorInfo) {
79+
def removeExecutor(exec: ExecutorDesc) {
8080
if (executors.contains(exec.fullId)) {
8181
executors -= exec.fullId
8282
coresUsed -= exec.cores

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.json4s.JValue
2727

2828
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
30-
import org.apache.spark.deploy.master.ExecutorInfo
30+
import org.apache.spark.deploy.master.ExecutorDesc
3131
import org.apache.spark.ui.{UIUtils, WebUIPage}
3232
import org.apache.spark.util.Utils
3333

@@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
109109
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
110110
}
111111

112-
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
112+
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
113113
<tr>
114114
<td>{executor.id}</td>
115115
<td>

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ private[spark] class EventLoggingListener(
168168
logEvent(event, flushLogger = true)
169169
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
170170
logEvent(event, flushLogger = true)
171+
override def onExecutorAdded(event: SparkListenerExecutorAdded) =
172+
logEvent(event, flushLogger = true)
173+
override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
174+
logEvent(event, flushLogger = true)
171175

172176
// No-op because logging every update would be overkill
173177
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable
2525
import org.apache.spark.{Logging, TaskEndReason}
2626
import org.apache.spark.annotation.DeveloperApi
2727
import org.apache.spark.executor.TaskMetrics
28+
import org.apache.spark.scheduler.cluster.ExecutorInfo
2829
import org.apache.spark.storage.BlockManagerId
2930
import org.apache.spark.util.{Distribution, Utils}
3031

@@ -84,6 +85,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
8485
@DeveloperApi
8586
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
8687

88+
@DeveloperApi
89+
case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
90+
extends SparkListenerEvent
91+
92+
@DeveloperApi
93+
case class SparkListenerExecutorRemoved(executorId: String)
94+
extends SparkListenerEvent
95+
8796
/**
8897
* Periodic updates from executors.
8998
* @param execId executor id
@@ -109,7 +118,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
109118
/**
110119
* :: DeveloperApi ::
111120
* Interface for listening to events from the Spark scheduler. Note that this is an internal
112-
* interface which might change in different Spark releases.
121+
* interface which might change in different Spark releases. Java clients should extend
122+
* {@link JavaSparkListener}
113123
*/
114124
@DeveloperApi
115125
trait SparkListener {
@@ -183,6 +193,16 @@ trait SparkListener {
183193
* Called when the driver receives task metrics from an executor in a heartbeat.
184194
*/
185195
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
196+
197+
/**
198+
* Called when the driver registers a new executor.
199+
*/
200+
def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
201+
202+
/**
203+
* Called when the driver removes an executor.
204+
*/
205+
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
186206
}
187207

188208
/**

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging {
7070
foreachListener(_.onApplicationEnd(applicationEnd))
7171
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
7272
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
73+
case executorAdded: SparkListenerExecutorAdded =>
74+
foreachListener(_.onExecutorAdded(executorAdded))
75+
case executorRemoved: SparkListenerExecutorRemoved =>
76+
foreachListener(_.onExecutorRemoved(executorRemoved))
7377
case SparkListenerShutdown =>
7478
}
7579
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

3030
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
31-
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
31+
import org.apache.spark.scheduler._
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3333
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
3434

@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
6666
// Number of executors requested from the cluster manager that have not registered yet
6767
private var numPendingExecutors = 0
6868

69+
private val listenerBus = scheduler.sc.listenerBus
70+
6971
// Executors we have requested the cluster manager to kill that have not died yet
7072
private val executorsPendingToRemove = new HashSet[String]
7173

@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
106108
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
107109
}
108110
}
111+
listenerBus.post(SparkListenerExecutorAdded(executorId, data))
109112
makeOffers()
110113
}
111114

@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
213216
totalCoreCount.addAndGet(-executorInfo.totalCores)
214217
totalRegisteredExecutors.addAndGet(-1)
215218
scheduler.executorLost(executorId, SlaveLost(reason))
219+
listenerBus.post(SparkListenerExecutorRemoved(executorId))
216220
case None => logError(s"Asked to remove non-existent executor $executorId")
217221
}
218222
}

0 commit comments

Comments
 (0)