Skip to content

Commit 4d3395e

Browse files
rxinMarcelo Vanzin
authored andcommitted
[SPARK-6578] Small rewrite to make the logic more clear in MessageWithHeader.transferTo.
Author: Reynold Xin <[email protected]> Closes apache#5319 from rxin/SPARK-6578 and squashes the following commits: 7c62a64 [Reynold Xin] Small rewrite to make the logic more clear in transferTo. (cherry picked from commit 899ebcb)
1 parent 526f230 commit 4d3395e

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
import java.nio.channels.WritableByteChannel;
2222

2323
import com.google.common.base.Preconditions;
24-
import com.google.common.primitives.Ints;
2524
import io.netty.buffer.ByteBuf;
2625
import io.netty.channel.FileRegion;
2726
import io.netty.util.AbstractReferenceCounted;
2827
import io.netty.util.ReferenceCountUtil;
2928

3029
/**
31-
* A wrapper message that holds two separate pieces (a header and a body) to avoid
32-
* copying the body's content.
30+
* A wrapper message that holds two separate pieces (a header and a body).
31+
*
32+
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
3333
*/
3434
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
3535

@@ -63,32 +63,36 @@ public long transfered() {
6363
return totalBytesTransferred;
6464
}
6565

66+
/**
67+
* This code is more complicated than you would think because we might require multiple
68+
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
69+
*
70+
* The contract is that the caller will ensure position is properly set to the total number
71+
* of bytes transferred so far (i.e. value returned by transfered()).
72+
*/
6673
@Override
67-
public long transferTo(WritableByteChannel target, long position) throws IOException {
74+
public long transferTo(final WritableByteChannel target, final long position) throws IOException {
6875
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
69-
long written = 0;
70-
71-
if (position < headerLength) {
72-
written += copyByteBuf(header, target);
76+
// Bytes written for header in this call.
77+
long writtenHeader = 0;
78+
if (header.readableBytes() > 0) {
79+
writtenHeader = copyByteBuf(header, target);
80+
totalBytesTransferred += writtenHeader;
7381
if (header.readableBytes() > 0) {
74-
totalBytesTransferred += written;
75-
return written;
82+
return writtenHeader;
7683
}
7784
}
7885

86+
// Bytes written for body in this call.
87+
long writtenBody = 0;
7988
if (body instanceof FileRegion) {
80-
// Adjust the position. If the write is happening as part of the same call where the header
81-
// (or some part of it) is written, `position` will be less than the header size, so we want
82-
// to start from position 0 in the FileRegion object. Otherwise, we start from the position
83-
// requested by the caller.
84-
long bodyPos = position > headerLength ? position - headerLength : 0;
85-
written += ((FileRegion)body).transferTo(target, bodyPos);
89+
writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
8690
} else if (body instanceof ByteBuf) {
87-
written += copyByteBuf((ByteBuf) body, target);
91+
writtenBody = copyByteBuf((ByteBuf) body, target);
8892
}
93+
totalBytesTransferred += writtenBody;
8994

90-
totalBytesTransferred += written;
91-
return written;
95+
return writtenHeader + writtenBody;
9296
}
9397

9498
@Override
@@ -102,5 +106,4 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept
102106
buf.skipBytes(written);
103107
return written;
104108
}
105-
106109
}

0 commit comments

Comments
 (0)