-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-19365][Core]Optimize RequestMessage serialization #16706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
*/ | ||
private[netty] case class RequestMessage( | ||
senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any) | ||
private[netty] class RequestMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed case
to make RequestMessage
non-serializable to avoid using Java serialization occasionally.
|
||
@transient @volatile var client: TransportClient = _ | ||
|
||
private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed _address
and _name
to save some bytes.
out.writeUTF(senderAddress.host) | ||
out.writeInt(senderAddress.port) | ||
} | ||
val receiverAddress = receiver.endpointAddress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write receiver.endpointAddress
rather than NettyRpcEndpointRef
since we only need the address to recreate them.
Test build #72008 has finished for PR 16706 at commit
|
* @param name Name of the endpoint. | ||
*/ | ||
private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) { | ||
private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Nullable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was working on this, I found I need to read codes to understand if this is nullable. Hence, I just added this annotation to improve the document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that we don't use this otherwise in Spark and it's not actually a JDK class. It's probably not worth using it in a handful of places only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we could add a comment there instead of pulling in a new annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. But since NettyRpcEnv already used it (added by @vanzin), I prefer to also use it in other places under the netty rpc package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey. I removed Nullable
.
endpointAddress: RpcEndpointAddress, | ||
@transient @volatile private var nettyEnv: NettyRpcEnv) | ||
extends RpcEndpointRef(conf) with Serializable with Logging { | ||
val endpointAddress: RpcEndpointAddress, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
while we're at it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val receiver: NettyRpcEndpointRef, val content: Any) { | ||
|
||
/** Manually serialize [[RequestMessage]] to minimize the size of bytes. */ | ||
def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't you just want to implement the standard Java serialization mechanism here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's different. If I just implement writeObject
and call Java serialization APIs to write RequestMessage
, it will write the full class name RequestMessage
and a serialization id which are not needed by all RPC messages.
cc @vanzin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the change seems fine to me. However I wonder if a more general change is possible where we say do something like a 'closure cleaning' on the input or output to java serialization to reduce the class information written. But I guess we can discuss this in a separate JIRA
* @param name Name of the endpoint. | ||
*/ | ||
private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) { | ||
private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we could add a comment there instead of pulling in a new annotation
Sounds like implementing Kryo :) |
Test build #72036 has finished for PR 16706 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable given the improvements. It'd be nice to have a targeted unit test for the new code in RequestMessage, though.
senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any) | ||
private[netty] class RequestMessage( | ||
val senderAddress: RpcAddress, | ||
val receiver: NettyRpcEndpointRef, val content: Any) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move content
to next line
val senderAddress: RpcAddress, | ||
val receiver: NettyRpcEndpointRef, val content: Any) { | ||
|
||
/** Manually serialize [[RequestMessage]] to minimize the size of bytes. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove "of bytes".
writeRpcAddress(out, senderAddress) | ||
writeRpcAddress(out, receiver.address) | ||
out.writeUTF(receiver.name) | ||
val contentBytes = nettyEnv.serialize(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... could you use JavaSerializerInstance.serializeStream
here instead?
You avoid: extra object allocations in serialize
, two copies of the serialized content in memory, and the extra copy operation below in out.write
.
You could also use ObjectOutputStream
directly (it implements DataOutput
) but that makes it difficult to use Kryo later.
Test build #72047 has finished for PR 16706 at commit
|
Test build #72065 has started for PR 16706 at commit |
retest this please |
2 similar comments
retest this please |
retest this please |
Test build #72083 has finished for PR 16706 at commit
|
LGTM. |
Thanks! Merging to master. |
## What changes were proposed in this pull request? Right now Netty PRC serializes `RequestMessage` using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB. This PR optimizes it by serializing `RequestMessage` manually (eliminate unnecessary information from most messages, e.g., class names of `RequestMessage`, `NettyRpcEndpointRef`, ...), and reduces the above message size to 100+ bytes. ## How was this patch tested? Jenkins I did a simple test to measure the improvement: Before ``` $ bin/spark-shell --master local-cluster[1,4,1024] ... scala> for (i <- 1 to 10) { | val start = System.nanoTime | val s = sc.parallelize(1 to 1000000, 10 * 1000).count() | val end = System.nanoTime | println(s"$i\t" + ((end - start)/1000/1000)) | } 1 6830 2 4353 3 3322 4 3107 5 3235 6 3139 7 3156 8 3166 9 3091 10 3029 ``` After: ``` $ bin/spark-shell --master local-cluster[1,4,1024] ... scala> for (i <- 1 to 10) { | val start = System.nanoTime | val s = sc.parallelize(1 to 1000000, 10 * 1000).count() | val end = System.nanoTime | println(s"$i\t" + ((end - start)/1000/1000)) | } 1 6431 2 3643 3 2913 4 2679 5 2760 6 2710 7 2747 8 2793 9 2679 10 2651 ``` I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB. Author: Shixiong Zhu <[email protected]> Closes apache#16706 from zsxwing/rpc-opt.
What changes were proposed in this pull request?
Right now Netty PRC serializes
RequestMessage
using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB.This PR optimizes it by serializing
RequestMessage
manually (eliminate unnecessary information from most messages, e.g., class names ofRequestMessage
,NettyRpcEndpointRef
, ...), and reduces the above message size to 100+ bytes.How was this patch tested?
Jenkins
I did a simple test to measure the improvement:
Before
After:
I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB.