-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-5214][Core] Add EventLoop and change DAGScheduler to an EventLoop #4016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @rxin |
Test build #25466 has started for PR 4016 at commit
|
Test build #25468 has started for PR 4016 at commit
|
Test build #25466 has finished for PR 4016 at commit
|
Test PASSed. |
Test build #25468 has finished for PR 4016 at commit
|
Test FAILed. |
Jenkins, retest this please. |
Test build #25469 has started for PR 4016 at commit
|
Test build #25469 has finished for PR 4016 at commit
|
Test PASSed. |
* An event loop to receive events from the caller and process all events in the event thread. It | ||
* will start an exclusive event thread to process all events. | ||
*/ | ||
abstract class EventLoop[E](name: String) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be private[spark]
Looks pretty good to me. Since it is an important component, might be worth getting more pairs of eyes to look at it. |
} | ||
|
||
def stop(): Unit = { | ||
eventThread.interrupt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, the interrupt() flag may be cleared and an exception never thrown, so I don't think we should rely only on this mechanism to stop the thread (in particular if the event loop is inside the onReceive() method). Can we also set a volatile variable which is checked in the while loop?
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.FunSuite | ||
|
||
class EventLoopSuite extends FunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you expand the tests for this class to include things like:
- Throwing an error within onError()
- Stopping the event loop if the onReceive() is inside something like a Lock#acquireUninterruptibly()
- Post events from different threads and make sure nothing throws a ConcurrentModificationException :)
Currently the implementation is simple, but I'd like to make sure that future changes don't break some property of event loops which the DAGScheduler rarely or never exercises.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopping the event loop if the onReceive() is inside something like a Lock#acquireUninterruptibly()
It cannot interrupt acquireUninterruptibly
. I have not found any place in Spark using it. What test are you suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I mean make sure that the event loop is eventually stopped despite stop() being called while the onReceive() was doing a busy-wait of clearing the interrupted flag, or calling an uninterruptible wait on a lock. However, it may be overly much trouble to set up such a condition and the new boolean flag probably works :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added test("EventLoop: onReceive swallows InterruptException")
to test clearing the interrupted flag.
Yes, of course, we can do this -- it's more-or-less going back to what we had before: 2539c06 I'm not seeing a lot of discussion about why we are considering this and the broader replacement of Akka. Is there some more discussion or motivation than what appears in the two JIRAs, SPARK-5124 and SPARK-5214? |
We are not removing Akka or considering removing Akka yet. It is just building it in a way that we can remove the dependency if we want to in the future. If we do consider, we will definitely have a broader discussion. If we ever do that, it'd be for making networking easier (both debugging and deployment), and enabling our users to use Akka (using Akka, especially a different version of it for an app on top of Spark is a mess right now. Spark not depending on Akka will make it easier for applications on top of Spark to use Akka). |
BTW I'm not sure why we ever did 2539c06 in the first place, other than making things more Scala-y. |
I hope that this is the beginning of a long, drawn-out series of commits that Akka-ize and de-Akka-ize various components of Spark, followed by the short but bloody Holy Akka Crusades, ultimately resulting in a complete rewrite of Spark in Erlang. |
Test build #25526 has started for PR 4016 at commit
|
} | ||
|
||
def stop(): Unit = { | ||
if (stopped.compareAndSet(false ,true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comma space
Test build #25534 has started for PR 4016 at commit
|
Test build #25526 has finished for PR 4016 at commit
|
Test FAILed. |
Test build #25529 has finished for PR 4016 at commit
|
Test FAILed. |
Test build #25534 has finished for PR 4016 at commit
|
Test PASSed. |
val eventLoop = new EventLoop[Int]("test") { | ||
|
||
override def onReceive(event: Int): Unit = { | ||
receivedEventsCount += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is += safe on a volatile int? I wouldn't think it's actually rewritten as a compareAndSwap loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is += safe on a volatile int?
It's safe. onReceive
must be called in the event thread. Not concurrency here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
volatile
is used to make sure we can read receivedEventsCount
correctly outside the event thread.
LGTM, code-wise. You might run the unit tests many times locally to try to find race conditions. For similar situations, I've used something like
and just let it run for a while. |
Cool. Ran it 10 minutes in my machine and it was successful. |
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line?
Test build #25630 has started for PR 4016 at commit
|
Test build #25630 has finished for PR 4016 at commit
|
Test PASSed. |
Thanks. Merging in master. |
This PR adds a simple `EventLoop` and use it to replace Actor in DAGScheduler. `EventLoop` is a general class to support that posting events in multiple threads and handling events in a single event thread. Author: zsxwing <[email protected]> Closes apache#4016 from zsxwing/event-loop and squashes the following commits: aefa1ce [zsxwing] Add protected to on*** methods 5cfac83 [zsxwing] Remove null check of eventProcessLoop dba35b2 [zsxwing] Add a test that onReceive swallows InterruptException 460f7b3 [zsxwing] Use volatile instead of Atomic things in unit tests 227bf33 [zsxwing] Add a stop flag and some tests 37f79c6 [zsxwing] Fix docs 55fb6f6 [zsxwing] Add private[spark] to EventLoop 1f73eac [zsxwing] Fix the import order 3b2e59c [zsxwing] Add EventLoop and change DAGScheduler to an EventLoop
This PR adds a simple
EventLoop
and use it to replace Actor in DAGScheduler.EventLoop
is a general class to support that posting events in multiple threads and handling events in a single event thread.