Skip to content

Commit b6ebef9

Browse files
7mming7zheniantoushipashi
authored andcommitted
ae skew join (apache#117)
* ae skew join * add metircs querystage * No support that a union node with differing number of partitions if we explicitly repartition them. * fix style * release r40
1 parent 1fa9242 commit b6ebef9

File tree

109 files changed

+3832
-807
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+3832
-807
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.11</artifactId>
24-
<version>2.4.1-kylin-r35</version>
24+
<version>2.4.1-kylin-r40</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

common/kvstore/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-r35</version>
25+
<version>2.4.1-kylin-r40</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-r35</version>
25+
<version>2.4.1-kylin-r40</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-shuffle/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-r35</version>
25+
<version>2.4.1-kylin-r40</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -197,48 +197,60 @@ public Map<String, Metric> getMetrics() {
197197
}
198198
}
199199

200+
private boolean isShuffleBlock(String[] blockIdParts) {
201+
// length == 4: ShuffleBlockId
202+
// length == 5: ContinuousShuffleBlockId
203+
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
204+
blockIdParts[0].equals("shuffle");
205+
}
206+
200207
private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
201208

202209
private int index = 0;
203210
private final String appId;
204211
private final String execId;
205212
private final int shuffleId;
206-
// An array containing mapId and reduceId pairs.
207-
private final int[] mapIdAndReduceIds;
213+
// An array containing mapId, reduceId and numBlocks tuple
214+
private final int[] shuffleBlockIds;
208215

209216
ManagedBufferIterator(String appId, String execId, String[] blockIds) {
210217
this.appId = appId;
211218
this.execId = execId;
212219
String[] blockId0Parts = blockIds[0].split("_");
213-
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
220+
if (!isShuffleBlock(blockId0Parts)) {
214221
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
215222
}
216223
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
217-
mapIdAndReduceIds = new int[2 * blockIds.length];
224+
shuffleBlockIds = new int[3 * blockIds.length];
218225
for (int i = 0; i < blockIds.length; i++) {
219226
String[] blockIdParts = blockIds[i].split("_");
220-
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
227+
if (!isShuffleBlock(blockIdParts)) {
221228
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
222229
}
223230
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
224231
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
225-
", got:" + blockIds[i]);
232+
", got:" + blockIds[i]);
233+
}
234+
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
235+
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
236+
if (blockIdParts.length == 4) {
237+
shuffleBlockIds[3 * i + 2] = 1;
238+
} else {
239+
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
226240
}
227-
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
228-
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
229241
}
230242
}
231243

232244
@Override
233245
public boolean hasNext() {
234-
return index < mapIdAndReduceIds.length;
246+
return index < shuffleBlockIds.length;
235247
}
236248

237249
@Override
238250
public ManagedBuffer next() {
239251
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
240-
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
241-
index += 2;
252+
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
253+
index += 3;
242254
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
243255
return block;
244256
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,21 +162,22 @@ public void registerExecutor(
162162
}
163163

164164
/**
165-
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
165+
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions
166166
* about how the hash and sort based shuffles store their data.
167167
*/
168168
public ManagedBuffer getBlockData(
169-
String appId,
170-
String execId,
171-
int shuffleId,
172-
int mapId,
173-
int reduceId) {
169+
String appId,
170+
String execId,
171+
int shuffleId,
172+
int mapId,
173+
int reduceId,
174+
int numBlocks) {
174175
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
175176
if (executor == null) {
176177
throw new RuntimeException(
177-
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
178+
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
178179
}
179-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
180+
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
180181
}
181182

182183
/**
@@ -280,19 +281,19 @@ public boolean accept(File dir, String name) {
280281
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
281282
*/
282283
private ManagedBuffer getSortBasedShuffleBlockData(
283-
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
284+
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
284285
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
285-
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
286+
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
286287

287288
try {
288289
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
289-
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
290+
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
290291
return new FileSegmentManagedBuffer(
291-
conf,
292-
getFile(executor.localDirs, executor.subDirsPerLocalDir,
293-
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
294-
shuffleIndexRecord.getOffset(),
295-
shuffleIndexRecord.getLength());
292+
conf,
293+
getFile(executor.localDirs, executor.subDirsPerLocalDir,
294+
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
295+
shuffleIndexRecord.getOffset(),
296+
shuffleIndexRecord.getLength());
296297
} catch (ExecutionException e) {
297298
throw new RuntimeException("Failed to open file: " + indexFile, e);
298299
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public int getSize() {
5959
/**
6060
* Get index offset for a particular reducer.
6161
*/
62-
public ShuffleIndexRecord getIndex(int reduceId) {
62+
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
6363
long offset = offsets.get(reduceId);
64-
long nextOffset = offsets.get(reduceId + 1);
64+
long nextOffset = offsets.get(reduceId + numBlocks);
6565
return new ShuffleIndexRecord(offset, nextOffset - offset);
6666
}
6767
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {
8383

8484
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
8585
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
86-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
87-
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
86+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
87+
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
8888
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
8989
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
9090
.toByteBuffer();
@@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
106106
assertEquals(block0Marker, buffers.next());
107107
assertEquals(block1Marker, buffers.next());
108108
assertFalse(buffers.hasNext());
109-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
110-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
109+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
110+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);
111111

112112
// Verify open block request latency metrics
113113
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testBadRequests() throws IOException {
6666
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
6767
// Unregistered executor
6868
try {
69-
resolver.getBlockData("app0", "exec1", 1, 1, 0);
69+
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
7070
fail("Should have failed");
7171
} catch (RuntimeException e) {
7272
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
@@ -75,7 +75,7 @@ public void testBadRequests() throws IOException {
7575
// Invalid shuffle manager
7676
try {
7777
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
78-
resolver.getBlockData("app0", "exec2", 1, 1, 0);
78+
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
7979
fail("Should have failed");
8080
} catch (UnsupportedOperationException e) {
8181
// pass
@@ -85,7 +85,7 @@ public void testBadRequests() throws IOException {
8585
resolver.registerExecutor("app0", "exec3",
8686
dataContext.createExecutorInfo(SORT_MANAGER));
8787
try {
88-
resolver.getBlockData("app0", "exec3", 1, 1, 0);
88+
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
8989
fail("Should have failed");
9090
} catch (Exception e) {
9191
// pass
@@ -99,18 +99,25 @@ public void testSortShuffleBlocks() throws IOException {
9999
dataContext.createExecutorInfo(SORT_MANAGER));
100100

101101
InputStream block0Stream =
102-
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
102+
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
103103
String block0 = CharStreams.toString(
104104
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
105105
block0Stream.close();
106106
assertEquals(sortBlock0, block0);
107107

108108
InputStream block1Stream =
109-
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
109+
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
110110
String block1 = CharStreams.toString(
111111
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112112
block1Stream.close();
113113
assertEquals(sortBlock1, block1);
114+
115+
InputStream block01Stream =
116+
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
117+
String block01 = CharStreams.toString(
118+
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
119+
block01Stream.close();
120+
assertEquals(sortBlock0 + sortBlock1, block01);
114121
}
115122

116123
@Test

common/network-yarn/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.4.1-kylin-r35</version>
25+
<version>2.4.1-kylin-r40</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

0 commit comments

Comments
 (0)