Skip to content

Commit 40789c4

Browse files
chrysanxiaGitHub Enterprise
authored andcommitted
[CARMEL-3152] driver scheduling enhancement (#58)
1 parent e1ffe93 commit 40789c4

File tree

16 files changed

+955
-564
lines changed

16 files changed

+955
-564
lines changed

core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util
2021
import java.util.concurrent.LinkedBlockingQueue
2122
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2223

@@ -98,18 +99,40 @@ private class AsyncEventQueue(
9899
}
99100

100101
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
101-
var next: SparkListenerEvent = eventQueue.take()
102-
while (next != POISON_PILL) {
103-
val ctx = processingTime.time()
104-
try {
105-
super.postToAll(next)
106-
} finally {
107-
ctx.stop()
102+
try {
103+
val events = new util.ArrayList[SparkListenerEvent](100)
104+
var isPoisonPill = false
105+
while (!isPoisonPill) {
106+
events.clear()
107+
drainFromEventQueue(events, 99)
108+
val it = events.iterator()
109+
while (it.hasNext) {
110+
val next = it.next()
111+
if (next == POISON_PILL) {
112+
isPoisonPill = true
113+
} else {
114+
val ctx = processingTime.time()
115+
try {
116+
super.postToAll(next)
117+
} finally {
118+
ctx.stop()
119+
}
120+
eventCount.decrementAndGet()
121+
}
122+
}
108123
}
109124
eventCount.decrementAndGet()
110-
next = eventQueue.take()
125+
126+
} catch {
127+
case ie: InterruptedException =>
128+
logInfo(s"Stopping listener queue $name.", ie)
129+
}
130+
}
131+
132+
private def drainFromEventQueue(collection: util.Collection[SparkListenerEvent], size: Int) = {
133+
if (eventQueue.drainTo(collection, size) == 0) {
134+
collection.add(eventQueue.take())
111135
}
112-
eventCount.decrementAndGet()
113136
}
114137

115138
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 434 additions & 278 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)