Skip to content

Commit df355cd

Browse files
committed
Add metrics to mesos cluster scheduler.
1 parent 20f7284 commit df355cd

File tree

5 files changed

+63
-8
lines changed

5 files changed

+63
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/DriverQueue.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ private[mesos] class DriverQueue(state: MesosClusterPersistenceEngine, capacity:
4444

4545
def isFull = count >= capacity
4646

47+
def size: Int = count
48+
4749
def contains(submissionId: String): Boolean = {
4850
queue.exists(s => s.submissionId.equals(submissionId))
4951
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/LaunchedDrivers.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ private[mesos] class LaunchedDrivers(state: MesosClusterPersistenceEngine) {
4040
}
4141
}
4242

43+
def size: Int = drivers.size
44+
4345
def get(submissionId: String): MesosClusterTaskState = drivers(submissionId)
4446

4547
def states: Iterable[MesosClusterTaskState] = {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,20 @@ import java.util.concurrent.atomic.AtomicLong
2323
import java.util.concurrent.locks.ReentrantLock
2424
import java.util.{Collections, Date, List => JList}
2525

26+
import scala.collection.JavaConversions._
27+
import scala.collection.mutable
28+
import scala.collection.mutable.ArrayBuffer
29+
2630
import org.apache.mesos.Protos.Environment.Variable
2731
import org.apache.mesos.Protos.TaskStatus.Reason
2832
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
2933
import org.apache.mesos.{Scheduler, SchedulerDriver}
34+
3035
import org.apache.spark.deploy.mesos.MesosDriverDescription
3136
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
37+
import org.apache.spark.metrics.MetricsSystem
3238
import org.apache.spark.util.Utils
33-
import org.apache.spark.{SparkConf, SparkException, TaskState}
34-
35-
import scala.collection.JavaConversions._
36-
import scala.collection.mutable
37-
import scala.collection.mutable.ArrayBuffer
39+
import org.apache.spark.{SparkConf, SparkException, SecurityManager, TaskState}
3840

3941

4042
/**
@@ -111,6 +113,8 @@ private[spark] class MesosClusterSchedulerDriver(
111113

112114
var frameworkUrl: String = _
113115

116+
private val metricsSystem =
117+
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
114118
private val master = conf.get("spark.master")
115119
private val appName = conf.get("spark.app.name")
116120
private val queuedCapacity = conf.getInt("spark.deploy.mesos.queuedDrivers", 200)
@@ -123,13 +127,14 @@ private[spark] class MesosClusterSchedulerDriver(
123127
private var frameworkId: String = null
124128

125129
// Stores all the launched and running drivers' states.
126-
private var launchedDrivers: LaunchedDrivers = _
130+
var launchedDrivers: LaunchedDrivers = _
127131

128132
// A queue that stores all the submitted drivers that hasn't been launched.
129-
private var queue: DriverQueue = _
133+
var queue: DriverQueue = _
130134

131135
// All supervised drivers that are waiting to retry after termination.
132-
private var superviseRetryList: SuperviseRetryList = _
136+
var superviseRetryList: SuperviseRetryList = _
137+
133138
private var masterInfo: MasterInfo = _
134139

135140
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -223,11 +228,15 @@ private[spark] class MesosClusterSchedulerDriver(
223228
frameworkId = id
224229
}
225230
recoverState
231+
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
232+
metricsSystem.start()
226233
startScheduler(
227234
"MesosClusterScheduler", master, MesosClusterSchedulerDriver.this, builder.build())
228235
}
229236

230237
def stop(): Unit = {
238+
metricsSystem.report()
239+
metricsSystem.stop()
231240
driver.stop(true)
232241
}
233242

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster.mesos
19+
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
21+
22+
import org.apache.spark.metrics.source.Source
23+
24+
private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterSchedulerDriver) extends Source {
25+
override def sourceName: String = "mesos_cluster"
26+
27+
override def metricRegistry: MetricRegistry = new MetricRegistry()
28+
29+
metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
30+
override def getValue: Int = scheduler.queue.size
31+
})
32+
33+
metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
34+
override def getValue: Int = scheduler.launchedDrivers.size
35+
})
36+
37+
metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
38+
override def getValue: Int = scheduler.superviseRetryList.size
39+
})
40+
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/SuperviseRetryList.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ private[mesos] class SuperviseRetryList(state: MesosClusterPersistenceEngine) {
5555
state.fetchAll[RetryState]().foreach(drivers.+=)
5656
}
5757

58+
def size: Int = drivers.size
59+
5860
def contains(submissionId: String): Boolean =
5961
drivers.exists(d => d.submission.submissionId.equals(submissionId))
6062

0 commit comments

Comments
 (0)