Skip to content

Commit 21d7d93

Browse files
committed
Back out of BlockObjectWriter change
1 parent 7eafecf commit 21d7d93

File tree

2 files changed

+50
-25
lines changed

2 files changed

+50
-25
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.util.collection.unsafe.sort;
1919

20-
import java.io.*;
20+
import java.io.File;
21+
import java.io.IOException;
2122

2223
import scala.Tuple2;
2324

@@ -31,15 +32,18 @@
3132

3233
final class UnsafeSorterSpillWriter {
3334

34-
private static final int SER_BUFFER_SIZE = 1024 * 1024; // TODO: tune this
35+
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
3536
static final int EOF_MARKER = -1;
3637

37-
private byte[] arr = new byte[SER_BUFFER_SIZE];
38+
// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
39+
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
40+
// data through a byte array. This array does not need to be large enough to hold a single
41+
// record;
42+
private byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
3843

3944
private final File file;
4045
private final BlockId blockId;
4146
private BlockObjectWriter writer;
42-
private DataOutputStream dos;
4347

4448
public UnsafeSorterSpillWriter(
4549
BlockManager blockManager,
@@ -55,32 +59,60 @@ public UnsafeSorterSpillWriter(
5559
// around this, we pass a dummy no-op serializer.
5660
writer = blockManager.getDiskWriter(
5761
blockId, file, DummySerializerInstance.INSTANCE, fileBufferSize, writeMetrics);
58-
dos = new DataOutputStream(writer);
62+
}
63+
64+
// Based on DataOutputStream.writeLong.
65+
private void writeLongToBuffer(long v, int offset) throws IOException {
66+
writeBuffer[offset + 0] = (byte)(v >>> 56);
67+
writeBuffer[offset + 1] = (byte)(v >>> 48);
68+
writeBuffer[offset + 2] = (byte)(v >>> 40);
69+
writeBuffer[offset + 3] = (byte)(v >>> 32);
70+
writeBuffer[offset + 4] = (byte)(v >>> 24);
71+
writeBuffer[offset + 5] = (byte)(v >>> 16);
72+
writeBuffer[offset + 6] = (byte)(v >>> 8);
73+
writeBuffer[offset + 7] = (byte)(v >>> 0);
74+
}
75+
76+
// Based on DataOutputStream.writeInt.
77+
private void writeIntToBuffer(int v, int offset) throws IOException {
78+
writeBuffer[offset + 0] = (byte)(v >>> 24);
79+
writeBuffer[offset + 1] = (byte)(v >>> 16);
80+
writeBuffer[offset + 2] = (byte)(v >>> 8);
81+
writeBuffer[offset + 3] = (byte)(v >>> 0);
5982
}
6083

6184
public void write(
6285
Object baseObject,
6386
long baseOffset,
6487
int recordLength,
6588
long keyPrefix) throws IOException {
66-
dos.writeInt(recordLength);
67-
dos.writeLong(keyPrefix);
68-
PlatformDependent.copyMemory(
69-
baseObject,
70-
baseOffset + 4,
71-
arr,
72-
PlatformDependent.BYTE_ARRAY_OFFSET,
73-
recordLength);
74-
writer.write(arr, 0, recordLength);
89+
writeIntToBuffer(recordLength, 0);
90+
writeLongToBuffer(keyPrefix, 4);
91+
int dataRemaining = recordLength;
92+
int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8;
93+
long recordReadPosition = baseOffset + 4; // skip over record length
94+
while (dataRemaining > 0) {
95+
final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
96+
PlatformDependent.copyMemory(
97+
baseObject,
98+
recordReadPosition,
99+
writeBuffer,
100+
PlatformDependent.BYTE_ARRAY_OFFSET + (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer),
101+
toTransfer);
102+
writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer) + toTransfer);
103+
recordReadPosition += toTransfer;
104+
dataRemaining -= toTransfer;
105+
freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE;
106+
}
75107
writer.recordWritten();
76108
}
77109

78110
public void close() throws IOException {
79-
dos.writeInt(EOF_MARKER);
111+
writeIntToBuffer(EOF_MARKER, 0);
112+
writer.write(writeBuffer, 0, 4);
80113
writer.commitAndClose();
81114
writer = null;
82-
dos = null;
83-
arr = null;
115+
writeBuffer = null;
84116
}
85117

86118
public long numberOfSpilledBytes() {

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,7 @@ private[spark] class DiskBlockObjectWriter(
211211
recordWritten()
212212
}
213213

214-
override def write(b: Int): Unit = {
215-
// TOOD: re-enable the `throw new UnsupportedOperationException()` here
216-
if (!initialized) {
217-
open()
218-
}
219-
220-
bs.write(b)
221-
}
214+
override def write(b: Int): Unit = throw new UnsupportedOperationException()
222215

223216
override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
224217
if (!initialized) {

0 commit comments

Comments
 (0)