Skip to content

Commit d10bf00

Browse files
committed
2 parents 7e0cc36 + b167a8c commit d10bf00

File tree

119 files changed

+3650
-1621
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+3650
-1621
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ conf/*.cmd
2323
conf/*.properties
2424
conf/*.conf
2525
conf/*.xml
26+
conf/slaves
2627
docs/_site
2728
docs/api
2829
target/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ log4j.properties
1919
log4j.properties.template
2020
metrics.properties.template
2121
slaves
22+
slaves.template
2223
spark-env.sh
2324
spark-env.cmd
2425
spark-env.sh.template
File renamed without changes.
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
import java.io.Serializable;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import scala.Function0;
26+
import scala.Function1;
27+
import scala.Unit;
28+
import scala.collection.JavaConversions;
29+
30+
import org.apache.spark.annotation.DeveloperApi;
31+
import org.apache.spark.executor.TaskMetrics;
32+
import org.apache.spark.util.TaskCompletionListener;
33+
import org.apache.spark.util.TaskCompletionListenerException;
34+
35+
/**
36+
* :: DeveloperApi ::
37+
* Contextual information about a task which can be read or mutated during execution.
38+
*/
39+
@DeveloperApi
40+
public class TaskContext implements Serializable {
41+
42+
private int stageId;
43+
private int partitionId;
44+
private long attemptId;
45+
private boolean runningLocally;
46+
private TaskMetrics taskMetrics;
47+
48+
/**
49+
* :: DeveloperApi ::
50+
* Contextual information about a task which can be read or mutated during execution.
51+
*
52+
* @param stageId stage id
53+
* @param partitionId index of the partition
54+
* @param attemptId the number of attempts to execute this task
55+
* @param runningLocally whether the task is running locally in the driver JVM
56+
* @param taskMetrics performance metrics of the task
57+
*/
58+
@DeveloperApi
59+
public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally,
60+
TaskMetrics taskMetrics) {
61+
this.attemptId = attemptId;
62+
this.partitionId = partitionId;
63+
this.runningLocally = runningLocally;
64+
this.stageId = stageId;
65+
this.taskMetrics = taskMetrics;
66+
}
67+
68+
/**
69+
* :: DeveloperApi ::
70+
* Contextual information about a task which can be read or mutated during execution.
71+
*
72+
* @param stageId stage id
73+
* @param partitionId index of the partition
74+
* @param attemptId the number of attempts to execute this task
75+
* @param runningLocally whether the task is running locally in the driver JVM
76+
*/
77+
@DeveloperApi
78+
public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) {
79+
this.attemptId = attemptId;
80+
this.partitionId = partitionId;
81+
this.runningLocally = runningLocally;
82+
this.stageId = stageId;
83+
this.taskMetrics = TaskMetrics.empty();
84+
}
85+
86+
/**
87+
* :: DeveloperApi ::
88+
* Contextual information about a task which can be read or mutated during execution.
89+
*
90+
* @param stageId stage id
91+
* @param partitionId index of the partition
92+
* @param attemptId the number of attempts to execute this task
93+
*/
94+
@DeveloperApi
95+
public TaskContext(int stageId, int partitionId, long attemptId) {
96+
this.attemptId = attemptId;
97+
this.partitionId = partitionId;
98+
this.runningLocally = false;
99+
this.stageId = stageId;
100+
this.taskMetrics = TaskMetrics.empty();
101+
}
102+
103+
private static ThreadLocal<TaskContext> taskContext =
104+
new ThreadLocal<TaskContext>();
105+
106+
/**
107+
* :: Internal API ::
108+
* This is spark internal API, not intended to be called from user programs.
109+
*/
110+
public static void setTaskContext(TaskContext tc) {
111+
taskContext.set(tc);
112+
}
113+
114+
public static TaskContext get() {
115+
return taskContext.get();
116+
}
117+
118+
/** :: Internal API :: */
119+
public static void unset() {
120+
taskContext.remove();
121+
}
122+
123+
// List of callback functions to execute when the task completes.
124+
private transient List<TaskCompletionListener> onCompleteCallbacks =
125+
new ArrayList<TaskCompletionListener>();
126+
127+
// Whether the corresponding task has been killed.
128+
private volatile boolean interrupted = false;
129+
130+
// Whether the task has completed.
131+
private volatile boolean completed = false;
132+
133+
/**
134+
* Checks whether the task has completed.
135+
*/
136+
public boolean isCompleted() {
137+
return completed;
138+
}
139+
140+
/**
141+
* Checks whether the task has been killed.
142+
*/
143+
public boolean isInterrupted() {
144+
return interrupted;
145+
}
146+
147+
/**
148+
* Add a (Java friendly) listener to be executed on task completion.
149+
* This will be called in all situation - success, failure, or cancellation.
150+
* <p/>
151+
* An example use is for HadoopRDD to register a callback to close the input stream.
152+
*/
153+
public TaskContext addTaskCompletionListener(TaskCompletionListener listener) {
154+
onCompleteCallbacks.add(listener);
155+
return this;
156+
}
157+
158+
/**
159+
* Add a listener in the form of a Scala closure to be executed on task completion.
160+
* This will be called in all situations - success, failure, or cancellation.
161+
* <p/>
162+
* An example use is for HadoopRDD to register a callback to close the input stream.
163+
*/
164+
public TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f) {
165+
onCompleteCallbacks.add(new TaskCompletionListener() {
166+
@Override
167+
public void onTaskCompletion(TaskContext context) {
168+
f.apply(context);
169+
}
170+
});
171+
return this;
172+
}
173+
174+
/**
175+
* Add a callback function to be executed on task completion. An example use
176+
* is for HadoopRDD to register a callback to close the input stream.
177+
* Will be called in any situation - success, failure, or cancellation.
178+
*
179+
* Deprecated: use addTaskCompletionListener
180+
*
181+
* @param f Callback function.
182+
*/
183+
@Deprecated
184+
public void addOnCompleteCallback(final Function0<Unit> f) {
185+
onCompleteCallbacks.add(new TaskCompletionListener() {
186+
@Override
187+
public void onTaskCompletion(TaskContext context) {
188+
f.apply();
189+
}
190+
});
191+
}
192+
193+
/**
194+
* ::Internal API::
195+
* Marks the task as completed and triggers the listeners.
196+
*/
197+
public void markTaskCompleted() throws TaskCompletionListenerException {
198+
completed = true;
199+
List<String> errorMsgs = new ArrayList<String>(2);
200+
// Process complete callbacks in the reverse order of registration
201+
List<TaskCompletionListener> revlist =
202+
new ArrayList<TaskCompletionListener>(onCompleteCallbacks);
203+
Collections.reverse(revlist);
204+
for (TaskCompletionListener tcl: revlist) {
205+
try {
206+
tcl.onTaskCompletion(this);
207+
} catch (Throwable e) {
208+
errorMsgs.add(e.getMessage());
209+
}
210+
}
211+
212+
if (!errorMsgs.isEmpty()) {
213+
throw new TaskCompletionListenerException(JavaConversions.asScalaBuffer(errorMsgs));
214+
}
215+
}
216+
217+
/**
218+
* ::Internal API::
219+
* Marks the task for interruption, i.e. cancellation.
220+
*/
221+
public void markInterrupted() {
222+
interrupted = true;
223+
}
224+
225+
@Deprecated
226+
/** Deprecated: use getStageId() */
227+
public int stageId() {
228+
return stageId;
229+
}
230+
231+
@Deprecated
232+
/** Deprecated: use getPartitionId() */
233+
public int partitionId() {
234+
return partitionId;
235+
}
236+
237+
@Deprecated
238+
/** Deprecated: use getAttemptId() */
239+
public long attemptId() {
240+
return attemptId;
241+
}
242+
243+
@Deprecated
244+
/** Deprecated: use isRunningLocally() */
245+
public boolean runningLocally() {
246+
return runningLocally;
247+
}
248+
249+
public boolean isRunningLocally() {
250+
return runningLocally;
251+
}
252+
253+
public int getStageId() {
254+
return stageId;
255+
}
256+
257+
public int getPartitionId() {
258+
return partitionId;
259+
}
260+
261+
public long getAttemptId() {
262+
return attemptId;
263+
}
264+
265+
/** ::Internal API:: */
266+
public TaskMetrics taskMetrics() {
267+
return taskMetrics;
268+
}
269+
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark
1919

2020
import java.io._
21+
import java.util.concurrent.ConcurrentHashMap
2122
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2223

2324
import scala.collection.mutable.{HashSet, HashMap, Map}
2425
import scala.concurrent.Await
26+
import scala.collection.JavaConversions._
2527

2628
import akka.actor._
2729
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8486
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
8587
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
8688
* master's corresponding HashMap.
89+
*
90+
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
91+
* thread-safe map.
8792
*/
8893
protected val mapStatuses: Map[Int, Array[MapStatus]]
8994

@@ -339,11 +344,11 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
339344
* MapOutputTrackerMaster.
340345
*/
341346
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
342-
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
347+
protected val mapStatuses: Map[Int, Array[MapStatus]] =
348+
new ConcurrentHashMap[Int, Array[MapStatus]]
343349
}
344350

345351
private[spark] object MapOutputTracker {
346-
private val LOG_BASE = 1.1
347352

348353
// Serialize an array of map output locations into an efficient byte format so that we can send
349354
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -379,34 +384,8 @@ private[spark] object MapOutputTracker {
379384
throw new MetadataFetchFailedException(
380385
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
381386
} else {
382-
(status.location, decompressSize(status.compressedSizes(reduceId)))
387+
(status.location, status.getSizeForBlock(reduceId))
383388
}
384389
}
385390
}
386-
387-
/**
388-
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
389-
* We do this by encoding the log base 1.1 of the size as an integer, which can support
390-
* sizes up to 35 GB with at most 10% error.
391-
*/
392-
def compressSize(size: Long): Byte = {
393-
if (size == 0) {
394-
0
395-
} else if (size <= 1L) {
396-
1
397-
} else {
398-
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
399-
}
400-
}
401-
402-
/**
403-
* Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
404-
*/
405-
def decompressSize(compressedSize: Byte): Long = {
406-
if (compressedSize == 0) {
407-
0
408-
} else {
409-
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
410-
}
411-
}
412391
}

0 commit comments

Comments
 (0)