Skip to content

Commit 03cc562

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2 parents b0930b0 + 69f7502 commit 03cc562

File tree

6 files changed

+39
-32
lines changed

6 files changed

+39
-32
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package org.apache.spark
1919

20+
import scala.language.implicitConversions
21+
2022
import java.io._
2123
import java.net.URI
2224
import java.util.concurrent.atomic.AtomicInteger
2325
import java.util.{Properties, UUID}
2426
import java.util.UUID.randomUUID
2527
import scala.collection.{Map, Set}
28+
import scala.collection.JavaConversions._
2629
import scala.collection.generic.Growable
27-
import scala.collection.mutable.{ArrayBuffer, HashMap}
28-
import scala.language.implicitConversions
30+
import scala.collection.mutable.HashMap
2931
import scala.reflect.{ClassTag, classTag}
3032
import org.apache.hadoop.conf.Configuration
3133
import org.apache.hadoop.fs.Path
@@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging {
836838
}
837839

838840
/**
839-
* Return pools for fair scheduler
840-
* TODO(xiajunluan): We should take nested pools into account
841+
* :: DeveloperApi ::
842+
* Return pools for fair scheduler
841843
*/
842-
def getAllPools: ArrayBuffer[Schedulable] = {
843-
taskScheduler.rootPool.schedulableQueue
844+
@DeveloperApi
845+
def getAllPools: Seq[Schedulable] = {
846+
// TODO(xiajunluan): We should take nested pools into account
847+
taskScheduler.rootPool.schedulableQueue.toSeq
844848
}
845849

846850
/**
851+
* :: DeveloperApi ::
847852
* Return the pool associated with the given name, if one exists
848853
*/
854+
@DeveloperApi
849855
def getPoolForName(pool: String): Option[Schedulable] = {
850-
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
856+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
851857
}
852858

853859
/**

core/src/main/scala/org/apache/spark/scheduler/Pool.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
21+
22+
import scala.collection.JavaConversions._
2023
import scala.collection.mutable.ArrayBuffer
21-
import scala.collection.mutable.HashMap
2224

2325
import org.apache.spark.Logging
2426
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -35,18 +37,15 @@ private[spark] class Pool(
3537
extends Schedulable
3638
with Logging {
3739

38-
var schedulableQueue = new ArrayBuffer[Schedulable]
39-
var schedulableNameToSchedulable = new HashMap[String, Schedulable]
40-
40+
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
41+
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
4142
var weight = initWeight
4243
var minShare = initMinShare
4344
var runningTasks = 0
44-
4545
var priority = 0
4646

4747
// A pool's stage id is used to break the tie in scheduling.
4848
var stageId = -1
49-
5049
var name = poolName
5150
var parent: Pool = null
5251

@@ -60,19 +59,20 @@ private[spark] class Pool(
6059
}
6160

6261
override def addSchedulable(schedulable: Schedulable) {
63-
schedulableQueue += schedulable
64-
schedulableNameToSchedulable(schedulable.name) = schedulable
62+
require(schedulable != null)
63+
schedulableQueue.add(schedulable)
64+
schedulableNameToSchedulable.put(schedulable.name, schedulable)
6565
schedulable.parent = this
6666
}
6767

6868
override def removeSchedulable(schedulable: Schedulable) {
69-
schedulableQueue -= schedulable
70-
schedulableNameToSchedulable -= schedulable.name
69+
schedulableQueue.remove(schedulable)
70+
schedulableNameToSchedulable.remove(schedulable.name)
7171
}
7272

7373
override def getSchedulableByName(schedulableName: String): Schedulable = {
74-
if (schedulableNameToSchedulable.contains(schedulableName)) {
75-
return schedulableNameToSchedulable(schedulableName)
74+
if (schedulableNameToSchedulable.containsKey(schedulableName)) {
75+
return schedulableNameToSchedulable.get(schedulableName)
7676
}
7777
for (schedulable <- schedulableQueue) {
7878
val sched = schedulable.getSchedulableByName(schedulableName)
@@ -95,11 +95,12 @@ private[spark] class Pool(
9595
shouldRevive
9696
}
9797

98-
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
98+
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
9999
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
100-
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
100+
val sortedSchedulableQueue =
101+
schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
101102
for (schedulable <- sortedSchedulableQueue) {
102-
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
103+
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
103104
}
104105
sortedTaskSetQueue
105106
}

core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.ConcurrentLinkedQueue
21+
2022
import scala.collection.mutable.ArrayBuffer
2123

2224
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2830
private[spark] trait Schedulable {
2931
var parent: Pool
3032
// child queues
31-
def schedulableQueue: ArrayBuffer[Schedulable]
33+
def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
3234
def schedulingMode: SchedulingMode
3335
def weight: Int
3436
def minShare: Int
@@ -42,5 +44,5 @@ private[spark] trait Schedulable {
4244
def getSchedulableByName(name: String): Schedulable
4345
def executorLost(executorId: String, host: String): Unit
4446
def checkSpeculatableTasks(): Boolean
45-
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
47+
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
4648
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl(
222222
// Build a list of tasks to assign to each worker.
223223
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
224224
val availableCpus = shuffledOffers.map(o => o.cores).toArray
225-
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
225+
val sortedTaskSets = rootPool.getSortedTaskSetQueue
226226
for (taskSet <- sortedTaskSets) {
227227
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
228228
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
117117
}
118118

119119
def resourceOffer(rootPool: Pool): Int = {
120-
val taskSetQueue = rootPool.getSortedTaskSetQueue()
120+
val taskSetQueue = rootPool.getSortedTaskSetQueue
121121
/* Just for Test*/
122122
for (manager <- taskSetQueue) {
123123
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(

pom.xml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,18 +496,16 @@
496496
<groupId>org.apache.avro</groupId>
497497
<artifactId>avro</artifactId>
498498
<version>${avro.version}</version>
499-
<exclusions>
500-
<exclusion>
501-
<groupId>io.netty</groupId>
502-
<artifactId>netty</artifactId>
503-
</exclusion>
504-
</exclusions>
505499
</dependency>
506500
<dependency>
507501
<groupId>org.apache.avro</groupId>
508502
<artifactId>avro-ipc</artifactId>
509503
<version>${avro.version}</version>
510504
<exclusions>
505+
<exclusion>
506+
<groupId>io.netty</groupId>
507+
<artifactId>netty</artifactId>
508+
</exclusion>
511509
<exclusion>
512510
<groupId>org.mortbay.jetty</groupId>
513511
<artifactId>jetty</artifactId>

0 commit comments

Comments
 (0)