Skip to content

Commit 227bf33

Browse files
committed
Add a stop flag and some tests
1 parent 37f79c6 commit 227bf33

File tree

2 files changed

+92
-5
lines changed

2 files changed

+92
-5
lines changed

core/src/main/scala/org/apache/spark/util/EventLoop.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.util
1919

20+
import java.util.concurrent.atomic.AtomicBoolean
2021
import java.util.concurrent.{BlockingQueue, LinkedBlockingDeque}
2122

2223
import scala.util.control.NonFatal
@@ -34,12 +35,14 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
3435

3536
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
3637

38+
private val stopped = new AtomicBoolean(false)
39+
3740
private val eventThread = new Thread(name) {
3841
setDaemon(true)
3942

4043
override def run(): Unit = {
4144
try {
42-
while (true) {
45+
while (!stopped.get) {
4346
val event = eventQueue.take()
4447
try {
4548
onReceive(event)
@@ -62,16 +65,23 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
6265
}
6366

6467
def start(): Unit = {
68+
if (stopped.get) {
69+
throw new IllegalStateException(name + " has already been stopped")
70+
}
6571
// Call onStart before starting the event thread to make sure it happens before onReceive
6672
onStart()
6773
eventThread.start()
6874
}
6975

7076
def stop(): Unit = {
71-
eventThread.interrupt()
72-
eventThread.join()
73-
// Call onStop after the event thread exits to make sure onReceive happens before onStop
74-
onStop()
77+
if (stopped.compareAndSet(false ,true)) {
78+
eventThread.interrupt()
79+
eventThread.join()
80+
// Call onStop after the event thread exits to make sure onReceive happens before onStop
81+
onStop()
82+
} else {
83+
// Keep quiet to allow calling `stop` multiple times.
84+
}
7585
}
7686

7787
/**

core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,81 @@ class EventLoopSuite extends FunSuite {
8080
}
8181
eventLoop.stop()
8282
}
83+
84+
test("EventLoop: error thrown from onError should not crash the event thread") {
85+
val e = new RuntimeException("Oops")
86+
val receivedError = new AtomicReference[Throwable]()
87+
val eventLoop = new EventLoop[Int]("test") {
88+
89+
override def onReceive(event: Int): Unit = {
90+
throw e
91+
}
92+
93+
override def onError(e: Throwable): Unit = {
94+
receivedError.set(e)
95+
throw new RuntimeException("Oops")
96+
}
97+
}
98+
eventLoop.start()
99+
eventLoop.post(1)
100+
eventually(timeout(5 seconds), interval(200 millis)) {
101+
assert(e === receivedError.get)
102+
assert(eventLoop.isActive)
103+
}
104+
eventLoop.stop()
105+
}
106+
107+
test("EventLoop: calling stop multiple times should only call onStop once") {
108+
var onStopTimes = 0
109+
val eventLoop = new EventLoop[Int]("test") {
110+
111+
override def onReceive(event: Int): Unit = {
112+
}
113+
114+
override def onError(e: Throwable): Unit = {
115+
}
116+
117+
override def onStop(): Unit = {
118+
onStopTimes += 1
119+
}
120+
}
121+
122+
eventLoop.start()
123+
124+
eventLoop.stop()
125+
eventLoop.stop()
126+
eventLoop.stop()
127+
128+
assert(1 === onStopTimes)
129+
}
130+
131+
test("EventLoop: post event in multiple threads") {
132+
var receivedEventsCount = 0
133+
val eventLoop = new EventLoop[Int]("test") {
134+
135+
override def onReceive(event: Int): Unit = {
136+
receivedEventsCount += 1
137+
}
138+
139+
override def onError(e: Throwable): Unit = {
140+
}
141+
142+
}
143+
eventLoop.start()
144+
145+
val threadNum = 5
146+
val eventsFromEachThread = 100
147+
(1 to threadNum).foreach { _ =>
148+
new Thread() {
149+
override def run(): Unit = {
150+
(1 to eventsFromEachThread).foreach(eventLoop.post)
151+
}
152+
}.start()
153+
}
154+
155+
eventually(timeout(5 seconds), interval(200 millis)) {
156+
assert(threadNum * eventsFromEachThread === receivedEventsCount)
157+
}
158+
eventLoop.stop()
159+
}
83160
}

0 commit comments

Comments
 (0)