Skip to content

Commit c9c2e4e

Browse files
author
Marcelo Vanzin
committed
Review comments: simplify some code.
1 parent 9c888ac commit c9c2e4e

File tree

3 files changed

+13
-15
lines changed

3 files changed

+13
-15
lines changed

network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) {
7373
assert header.writableBytes() == 0;
7474

7575
if (body != null && bodyLength > 0) {
76-
out.add(new MessageWithHeader(header, headerLength, body, bodyLength));
76+
out.add(new MessageWithHeader(header, body, bodyLength));
7777
} else {
7878
out.add(header);
7979
}

network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
3939
private final long bodyLength;
4040
private int totalBytesTransferred;
4141

42-
MessageWithHeader(ByteBuf header, int headerLength, Object body, long bodyLength) {
42+
MessageWithHeader(ByteBuf header, Object body, long bodyLength) {
4343
Preconditions.checkArgument(body instanceof ByteBuf || body instanceof FileRegion,
4444
"Body must be a ByteBuf or a FileRegion.");
4545
this.header = header;
46-
this.headerLength = headerLength;
46+
this.headerLength = header.readableBytes();
4747
this.body = body;
4848
this.bodyLength = bodyLength;
4949
}
@@ -65,22 +65,26 @@ public long transfered() {
6565

6666
@Override
6767
public long transferTo(WritableByteChannel target, long position) throws IOException {
68-
Preconditions.checkArgument(position >= 0 && position < count(), "Invalid position.");
68+
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
6969
long written = 0;
7070

7171
if (position < headerLength) {
72-
written += copyByteBuf(header, target, position);
72+
written += copyByteBuf(header, target);
7373
if (header.readableBytes() > 0) {
7474
totalBytesTransferred += written;
7575
return written;
7676
}
7777
}
7878

7979
if (body instanceof FileRegion) {
80+
// Adjust the position. If the write is happening as part of the same call where the header
81+
// (or some part of it) is written, `position` will be less than the header size, so we want
82+
// to start from position 0 in the FileRegion object. Otherwise, we start from the position
83+
// requested by the caller.
8084
long bodyPos = position > headerLength ? position - headerLength : 0;
8185
written += ((FileRegion)body).transferTo(target, bodyPos);
8286
} else if (body instanceof ByteBuf) {
83-
written += copyByteBuf((ByteBuf) body, target, position);
87+
written += copyByteBuf((ByteBuf) body, target);
8488
}
8589

8690
totalBytesTransferred += written;
@@ -93,12 +97,7 @@ protected void deallocate() {
9397
ReferenceCountUtil.release(body);
9498
}
9599

96-
private int copyByteBuf(ByteBuf buf, WritableByteChannel target, long position)
97-
throws IOException {
98-
99-
if (position > totalBytesTransferred) {
100-
buf.skipBytes(Ints.checkedCast(position - totalBytesTransferred));
101-
}
100+
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
102101
int written = target.write(buf.nioBuffer());
103102
buf.skipBytes(written);
104103
return written;

network/common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public void testShortWrite() throws Exception {
4747
public void testByteBufBody() throws Exception {
4848
ByteBuf header = Unpooled.copyLong(42);
4949
ByteBuf body = Unpooled.copyLong(84);
50-
MessageWithHeader msg = new MessageWithHeader(header, header.readableBytes(), body,
51-
body.readableBytes());
50+
MessageWithHeader msg = new MessageWithHeader(header, body, body.readableBytes());
5251

5352
ByteBuf result = doWrite(msg, 1);
5453
assertEquals(msg.count(), result.readableBytes());
@@ -60,7 +59,7 @@ private void testFileRegionBody(int totalWrites, int writesPerCall) throws Excep
6059
ByteBuf header = Unpooled.copyLong(42);
6160
int headerLength = header.readableBytes();
6261
TestFileRegion region = new TestFileRegion(totalWrites, writesPerCall);
63-
MessageWithHeader msg = new MessageWithHeader(header, headerLength, region, region.count());
62+
MessageWithHeader msg = new MessageWithHeader(header, region, region.count());
6463

6564
ByteBuf result = doWrite(msg, totalWrites / writesPerCall);
6665
assertEquals(headerLength + region.count(), result.readableBytes());

0 commit comments

Comments
 (0)