Skip to content

Commit ce90ec8

Browse files
committed
merge in master
2 parents bcdf3c9 + 48fc38f commit ce90ec8

File tree

182 files changed

+5707
-1963
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

182 files changed

+5707
-1963
lines changed

core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,16 @@
361361
<artifactId>junit</artifactId>
362362
<scope>test</scope>
363363
</dependency>
364+
<dependency>
365+
<groupId>org.hamcrest</groupId>
366+
<artifactId>hamcrest-core</artifactId>
367+
<scope>test</scope>
368+
</dependency>
369+
<dependency>
370+
<groupId>org.hamcrest</groupId>
371+
<artifactId>hamcrest-library</artifactId>
372+
<scope>test</scope>
373+
</dependency>
364374
<dependency>
365375
<groupId>com.novocode</groupId>
366376
<artifactId>junit-interface</artifactId>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.nio.ByteBuffer;
24+
25+
import scala.reflect.ClassTag;
26+
27+
import org.apache.spark.serializer.DeserializationStream;
28+
import org.apache.spark.serializer.SerializationStream;
29+
import org.apache.spark.serializer.SerializerInstance;
30+
import org.apache.spark.unsafe.PlatformDependent;
31+
32+
/**
33+
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
34+
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
35+
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
36+
* around this, we pass a dummy no-op serializer.
37+
*/
38+
final class DummySerializerInstance extends SerializerInstance {
39+
40+
public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();
41+
42+
private DummySerializerInstance() { }
43+
44+
@Override
45+
public SerializationStream serializeStream(final OutputStream s) {
46+
return new SerializationStream() {
47+
@Override
48+
public void flush() {
49+
// Need to implement this because DiskObjectWriter uses it to flush the compression stream
50+
try {
51+
s.flush();
52+
} catch (IOException e) {
53+
PlatformDependent.throwException(e);
54+
}
55+
}
56+
57+
@Override
58+
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
59+
throw new UnsupportedOperationException();
60+
}
61+
62+
@Override
63+
public void close() {
64+
// Need to implement this because DiskObjectWriter uses it to close the compression stream
65+
try {
66+
s.close();
67+
} catch (IOException e) {
68+
PlatformDependent.throwException(e);
69+
}
70+
}
71+
};
72+
}
73+
74+
@Override
75+
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
76+
throw new UnsupportedOperationException();
77+
}
78+
79+
@Override
80+
public DeserializationStream deserializeStream(InputStream s) {
81+
throw new UnsupportedOperationException();
82+
}
83+
84+
@Override
85+
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
86+
throw new UnsupportedOperationException();
87+
}
88+
89+
@Override
90+
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
91+
throw new UnsupportedOperationException();
92+
}
93+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
/**
21+
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
22+
* <p>
23+
* Within the long, the data is laid out as follows:
24+
* <pre>
25+
* [24 bit partition number][13 bit memory page number][27 bit offset in page]
26+
* </pre>
27+
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
28+
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
29+
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
30+
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
31+
* <p>
32+
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
33+
* optimization to future work as it will require more careful design to ensure that addresses are
34+
* properly aligned (e.g. by padding records).
35+
*/
36+
final class PackedRecordPointer {
37+
38+
static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
39+
40+
/**
41+
* The maximum partition identifier that can be encoded. Note that partition ids start from 0.
42+
*/
43+
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
44+
45+
/** Bit mask for the lower 40 bits of a long. */
46+
private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
47+
48+
/** Bit mask for the upper 24 bits of a long */
49+
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
50+
51+
/** Bit mask for the lower 27 bits of a long. */
52+
private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
53+
54+
/** Bit mask for the lower 51 bits of a long. */
55+
private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
56+
57+
/** Bit mask for the upper 13 bits of a long */
58+
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
59+
60+
/**
61+
* Pack a record address and partition id into a single word.
62+
*
63+
* @param recordPointer a record pointer encoded by TaskMemoryManager.
64+
* @param partitionId a shuffle partition id (maximum value of 2^24).
65+
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
66+
*/
67+
public static long packPointer(long recordPointer, int partitionId) {
68+
assert (partitionId <= MAXIMUM_PARTITION_ID);
69+
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
70+
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
71+
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
72+
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
73+
return (((long) partitionId) << 40) | compressedAddress;
74+
}
75+
76+
private long packedRecordPointer;
77+
78+
public void set(long packedRecordPointer) {
79+
this.packedRecordPointer = packedRecordPointer;
80+
}
81+
82+
public int getPartitionId() {
83+
return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
84+
}
85+
86+
public long getRecordPointer() {
87+
final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
88+
final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
89+
return pageNumber | offsetInPage;
90+
}
91+
92+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import java.io.File;
21+
22+
import org.apache.spark.storage.TempShuffleBlockId;
23+
24+
/**
25+
* Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
26+
*/
27+
final class SpillInfo {
28+
final long[] partitionLengths;
29+
final File file;
30+
final TempShuffleBlockId blockId;
31+
32+
public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
33+
this.partitionLengths = new long[numPartitions];
34+
this.file = file;
35+
this.blockId = blockId;
36+
}
37+
}

0 commit comments

Comments
 (0)