-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-6578] [core] Fix thread-safety issue in outbound path of network library. #5234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
While the inbound path of a netty pipeline is thread-safe, the outbound path is not. That means that multiple threads can compete to write messages to the next stage of the pipeline. The network library sometimes breaks a single RPC message into multiple buffers internally to avoid copying data (see MessageEncoder). This can result in the following scenario (where "FxBy" means "frame x, buffer y"): T1 F1B1 F1B2 \ \ \ \ socket F1B1 F2B1 F1B2 F2B2 / / / / T2 F2B1 F2B2 And the frames now cannot be rebuilt on the receiving side because the different messages have been mixed up on the wire. The fix introduces a new stage in the pipeline that acts as a "demultiplexer": multi-buffer frames are expected to navigate the pipeline as a List, and the new handler will write all buffers in the List to the next stage in a thread-safe manner.
Test build #29321 has started for PR 5234 at commit
|
Test build #29323 has started for PR 5234 at commit
|
It's been too long since the electronics class in college, I guess.
Test build #29324 has started for PR 5234 at commit
|
Test build #29321 has finished for PR 5234 at commit
|
Test PASSed. |
Test build #29323 has finished for PR 5234 at commit
|
Test PASSed. |
Test build #29324 has finished for PR 5234 at commit
|
Test PASSed. |
BTW, if anyone cares to look at it, I wrote some code that shows the race exists; you can see it here. I didn't post it as part of the PR because it's a contrived test that doesn't really test the network library, but is just meant to show what happened when the bug was actually triggered before. |
This seems to me a pretty serious problem with our current implementation and a limitation from Netty w.r.t. the use of zero-copy. If I am understanding this correctly, the problem is that Netty Channel cannot accept a composite buffer that includes a ByteBuf and a FileRegion at the same time. As a result, we separated a data transfer message into two messages: one for header, and one for data itself using transferTo. Netty already went most of the way to make sure users don't have to worry about linearizability and concurrency, but in this case we'd have to resort to locking in order to guarantee atomicity of a message that gets divided into two. If Netty can have some support structure for this, it'd greatly simplify our programming model and eliminate the locking required at our level. Looks like an extra if statement that checks whether the "msg" is a special object that contain a ByteBuf and a FileRegion: https://github.com/netty/netty/blob/33f75d374091946058fc334c3cdbcd0f0a59d9b3/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L189 cc @normanmaurer and @trustin any comment on this? |
@varzin any idea on the performance impact of this patch? |
Adding a lock is always a little bit scary, but I wouldn't expect a lot of overhead from this code, given that netty itself has to do context switching to actually write the data to the socket. An easy optimization would be to avoid the lock when the input list has a single element, since in that case we don't care about the ordering. One question I have is: does netty have some interface/method that I can implement to make any object directly writable to a socket? e.g. the If there is something like that, then we can avoid the lock by wrapping the two messages created by |
You can implement FileRegion
|
Ah that makes sense. We can just add the header to our own LazyFileRegion. It won't work with the native epoll transport, but should be fine for nio since it is the default anyway. |
@vanzin do you have time to do the fix suggested by @normanmaurer? If you do, I can review it tonight and merge it. If you don't, Aaron or I will probably find time to fix it tomorrow. The idea is to add a header ByteBuf to LazyFileRegion, and in transferTo transfer both the header and the body. Then MessageEncoder can return either a single ByteBuf or a single LazyFileRegion. |
Yeah, I hope to take a look at this after lunch. |
Netty treats FileRegion by calling its "transferTo" method to copy data to the destination. We can use this to wrap the whole outgoing message into a single one, to avoid the problems that trying to send two different messages would cause. This also avoids having a lock in the user-level threads. Also add a log4j config (and update pom) so that we get logs from unit test runs.
Ok, I implemented the |
BTW if you're ok with this approach I'll need to update the change description since it still references the lock-based approach. |
Test build #29424 has started for PR 5234 at commit |
Test FAILed. |
Test build #29437 has started for PR 5234 at commit |
I'm trying to run unit tests locally but it seems something is busted somewhere else. e.g. I get this:
I'll try to dig more into this later, but so far I don't see a connection between my code and the current test issues I'm running into. |
Test build #29480 has started for PR 5234 at commit |
Test FAILed. |
Test build #29497 has started for PR 5234 at commit |
Test build #29497 has finished for PR 5234 at commit
|
Test PASSed. |
Test build #29505 has started for PR 5234 at commit |
private final int headerLength; | ||
private final Object body; | ||
private final long bodyLength; | ||
private int totalBytesTransferred; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes.
@normanmaurer do you want me to create a ticket for netty for tracking? |
Test build #29505 has finished for PR 5234 at commit
|
Test PASSed. |
Test build #29513 has started for PR 5234 at commit |
Test build #29513 has finished for PR 5234 at commit
|
Test PASSed. |
} | ||
|
||
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException { | ||
int written = target.write(buf.nioBuffer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually safe? nioBuffer can create a duplicate each time, and thus resetting the offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "resetting the offset"? The buffer returned by nioBuffer()
does not change the state of the ByteBuf
, which is why you have the skipBytes
call in the next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nvm i thougth this was a ManagedBuffer
Thanks. Merging in master & branch-1.3. I think there is a minor change we can do to make the code more clear, but I will just submit a small PR for that myself. |
…rk library. While the inbound path of a netty pipeline is thread-safe, the outbound path is not. That means that multiple threads can compete to write messages to the next stage of the pipeline. The network library sometimes breaks a single RPC message into multiple buffers internally to avoid copying data (see MessageEncoder). This can result in the following scenario (where "FxBy" means "frame x, buffer y"): T1 F1B1 F1B2 \ \ \ \ socket F1B1 F2B1 F1B2 F2B2 / / / / T2 F2B1 F2B2 And the frames now cannot be rebuilt on the receiving side because the different messages have been mixed up on the wire. The fix wraps these multi-buffer messages into a `FileRegion` object so that these messages are written "atomically" to the next pipeline handler. Author: Marcelo Vanzin <[email protected]> Closes #5234 from vanzin/SPARK-6578 and squashes the following commits: 16b2d70 [Marcelo Vanzin] Forgot to update a type. c9c2e4e [Marcelo Vanzin] Review comments: simplify some code. 9c888ac [Marcelo Vanzin] Small style nits. 8474bab [Marcelo Vanzin] Fix multiple calls to MessageWithHeader.transferTo(). e26509f [Marcelo Vanzin] Merge branch 'master' into SPARK-6578 c503f6c [Marcelo Vanzin] Implement a custom FileRegion instead of using locks. 84aa7ce [Marcelo Vanzin] Rename handler to the correct name. 432f3bd [Marcelo Vanzin] Remove unneeded method. 8d70e60 [Marcelo Vanzin] Fix thread-safety issue in outbound path of network library. (cherry picked from commit f084c5d) Signed-off-by: Reynold Xin <[email protected]>
…rk library. While the inbound path of a netty pipeline is thread-safe, the outbound path is not. That means that multiple threads can compete to write messages to the next stage of the pipeline. The network library sometimes breaks a single RPC message into multiple buffers internally to avoid copying data (see MessageEncoder). This can result in the following scenario (where "FxBy" means "frame x, buffer y"): T1 F1B1 F1B2 \ \ \ \ socket F1B1 F2B1 F1B2 F2B2 / / / / T2 F2B1 F2B2 And the frames now cannot be rebuilt on the receiving side because the different messages have been mixed up on the wire. The fix wraps these multi-buffer messages into a `FileRegion` object so that these messages are written "atomically" to the next pipeline handler. Author: Marcelo Vanzin <[email protected]> Closes apache#5234 from vanzin/SPARK-6578 and squashes the following commits: 16b2d70 [Marcelo Vanzin] Forgot to update a type. c9c2e4e [Marcelo Vanzin] Review comments: simplify some code. 9c888ac [Marcelo Vanzin] Small style nits. 8474bab [Marcelo Vanzin] Fix multiple calls to MessageWithHeader.transferTo(). e26509f [Marcelo Vanzin] Merge branch 'master' into SPARK-6578 c503f6c [Marcelo Vanzin] Implement a custom FileRegion instead of using locks. 84aa7ce [Marcelo Vanzin] Rename handler to the correct name. 432f3bd [Marcelo Vanzin] Remove unneeded method. 8d70e60 [Marcelo Vanzin] Fix thread-safety issue in outbound path of network library. (cherry picked from commit f084c5d) Conflicts: network/common/pom.xml
…rk library. While the inbound path of a netty pipeline is thread-safe, the outbound path is not. That means that multiple threads can compete to write messages to the next stage of the pipeline. The network library sometimes breaks a single RPC message into multiple buffers internally to avoid copying data (see MessageEncoder). This can result in the following scenario (where "FxBy" means "frame x, buffer y"): T1 F1B1 F1B2 \ \ \ \ socket F1B1 F2B1 F1B2 F2B2 / / / / T2 F2B1 F2B2 And the frames now cannot be rebuilt on the receiving side because the different messages have been mixed up on the wire. The fix wraps these multi-buffer messages into a `FileRegion` object so that these messages are written "atomically" to the next pipeline handler. Author: Marcelo Vanzin <[email protected]> Closes apache#5234 from vanzin/SPARK-6578 and squashes the following commits: 16b2d70 [Marcelo Vanzin] Forgot to update a type. c9c2e4e [Marcelo Vanzin] Review comments: simplify some code. 9c888ac [Marcelo Vanzin] Small style nits. 8474bab [Marcelo Vanzin] Fix multiple calls to MessageWithHeader.transferTo(). e26509f [Marcelo Vanzin] Merge branch 'master' into SPARK-6578 c503f6c [Marcelo Vanzin] Implement a custom FileRegion instead of using locks. 84aa7ce [Marcelo Vanzin] Rename handler to the correct name. 432f3bd [Marcelo Vanzin] Remove unneeded method. 8d70e60 [Marcelo Vanzin] Fix thread-safety issue in outbound path of network library. (cherry picked from commit f084c5d)
While the inbound path of a netty pipeline is thread-safe, the outbound
path is not. That means that multiple threads can compete to write messages
to the next stage of the pipeline.
The network library sometimes breaks a single RPC message into multiple
buffers internally to avoid copying data (see MessageEncoder). This can
result in the following scenario (where "FxBy" means "frame x, buffer y"):
And the frames now cannot be rebuilt on the receiving side because the
different messages have been mixed up on the wire.
The fix wraps these multi-buffer messages into a
FileRegion
objectso that these messages are written "atomically" to the next pipeline handler.