|
18 | 18 | package org.apache.spark;
|
19 | 19 |
|
20 | 20 | import java.io.Serializable;
|
21 |
| -import java.util.ArrayList; |
22 |
| -import java.util.Collections; |
23 |
| -import java.util.List; |
24 | 21 |
|
25 | 22 | import scala.Function0;
|
26 | 23 | import scala.Function1;
|
27 | 24 | import scala.Unit;
|
28 |
| -import scala.collection.JavaConversions; |
29 | 25 |
|
30 | 26 | import org.apache.spark.annotation.DeveloperApi;
|
31 | 27 | import org.apache.spark.executor.TaskMetrics;
|
32 | 28 | import org.apache.spark.util.TaskCompletionListener;
|
33 |
| -import org.apache.spark.util.TaskCompletionListenerException; |
34 | 29 |
|
35 | 30 | /**
|
36 |
| -* :: DeveloperApi :: |
37 |
| -* Contextual information about a task which can be read or mutated during execution. |
38 |
| -*/ |
39 |
| -@DeveloperApi |
40 |
| -public class TaskContext implements Serializable { |
41 |
| - |
42 |
| - private int stageId; |
43 |
| - private int partitionId; |
44 |
| - private long attemptId; |
45 |
| - private boolean runningLocally; |
46 |
| - private TaskMetrics taskMetrics; |
47 |
| - |
48 |
| - /** |
49 |
| - * :: DeveloperApi :: |
50 |
| - * Contextual information about a task which can be read or mutated during execution. |
51 |
| - * |
52 |
| - * @param stageId stage id |
53 |
| - * @param partitionId index of the partition |
54 |
| - * @param attemptId the number of attempts to execute this task |
55 |
| - * @param runningLocally whether the task is running locally in the driver JVM |
56 |
| - * @param taskMetrics performance metrics of the task |
57 |
| - */ |
58 |
| - @DeveloperApi |
59 |
| - public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally, |
60 |
| - TaskMetrics taskMetrics) { |
61 |
| - this.attemptId = attemptId; |
62 |
| - this.partitionId = partitionId; |
63 |
| - this.runningLocally = runningLocally; |
64 |
| - this.stageId = stageId; |
65 |
| - this.taskMetrics = taskMetrics; |
66 |
| - } |
67 |
| - |
68 |
| - /** |
69 |
| - * :: DeveloperApi :: |
70 |
| - * Contextual information about a task which can be read or mutated during execution. |
71 |
| - * |
72 |
| - * @param stageId stage id |
73 |
| - * @param partitionId index of the partition |
74 |
| - * @param attemptId the number of attempts to execute this task |
75 |
| - * @param runningLocally whether the task is running locally in the driver JVM |
76 |
| - */ |
77 |
| - @DeveloperApi |
78 |
| - public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) { |
79 |
| - this.attemptId = attemptId; |
80 |
| - this.partitionId = partitionId; |
81 |
| - this.runningLocally = runningLocally; |
82 |
| - this.stageId = stageId; |
83 |
| - this.taskMetrics = TaskMetrics.empty(); |
84 |
| - } |
85 |
| - |
| 31 | + * Contextual information about a task which can be read or mutated during |
| 32 | + * execution. To access the TaskContext for a running task use |
| 33 | + * TaskContext.get(). |
| 34 | + */ |
| 35 | +public abstract class TaskContext implements Serializable { |
86 | 36 | /**
|
87 |
| - * :: DeveloperApi :: |
88 |
| - * Contextual information about a task which can be read or mutated during execution. |
89 |
| - * |
90 |
| - * @param stageId stage id |
91 |
| - * @param partitionId index of the partition |
92 |
| - * @param attemptId the number of attempts to execute this task |
| 37 | + * Return the currently active TaskContext. This can be called inside of |
| 38 | + * user functions to access contextual information about running tasks. |
93 | 39 | */
|
94 |
| - @DeveloperApi |
95 |
| - public TaskContext(int stageId, int partitionId, long attemptId) { |
96 |
| - this.attemptId = attemptId; |
97 |
| - this.partitionId = partitionId; |
98 |
| - this.runningLocally = false; |
99 |
| - this.stageId = stageId; |
100 |
| - this.taskMetrics = TaskMetrics.empty(); |
| 40 | + public static TaskContext get() { |
| 41 | + return taskContext.get(); |
101 | 42 | }
|
102 | 43 |
|
103 | 44 | private static ThreadLocal<TaskContext> taskContext =
|
104 | 45 | new ThreadLocal<TaskContext>();
|
105 | 46 |
|
106 |
| - /** |
107 |
| - * :: Internal API :: |
108 |
| - * This is spark internal API, not intended to be called from user programs. |
109 |
| - */ |
110 |
| - public static void setTaskContext(TaskContext tc) { |
| 47 | + static void setTaskContext(TaskContext tc) { |
111 | 48 | taskContext.set(tc);
|
112 | 49 | }
|
113 | 50 |
|
114 |
| - public static TaskContext get() { |
115 |
| - return taskContext.get(); |
116 |
| - } |
117 |
| - |
118 |
| - /** :: Internal API :: */ |
119 |
| - public static void unset() { |
| 51 | + static void unset() { |
120 | 52 | taskContext.remove();
|
121 | 53 | }
|
122 | 54 |
|
123 |
| - // List of callback functions to execute when the task completes. |
124 |
| - private transient List<TaskCompletionListener> onCompleteCallbacks = |
125 |
| - new ArrayList<TaskCompletionListener>(); |
126 |
| - |
127 |
| - // Whether the corresponding task has been killed. |
128 |
| - private volatile boolean interrupted = false; |
129 |
| - |
130 |
| - // Whether the task has completed. |
131 |
| - private volatile boolean completed = false; |
132 |
| - |
133 | 55 | /**
|
134 |
| - * Checks whether the task has completed. |
| 56 | + * Whether the task has completed. |
135 | 57 | */
|
136 |
| - public boolean isCompleted() { |
137 |
| - return completed; |
138 |
| - } |
| 58 | + public abstract boolean isCompleted(); |
139 | 59 |
|
140 | 60 | /**
|
141 |
| - * Checks whether the task has been killed. |
| 61 | + * Whether the task has been killed. |
142 | 62 | */
|
143 |
| - public boolean isInterrupted() { |
144 |
| - return interrupted; |
145 |
| - } |
| 63 | + public abstract boolean isInterrupted(); |
| 64 | + |
| 65 | + /** @deprecated: use isRunningLocally() */ |
| 66 | + @Deprecated |
| 67 | + public abstract boolean runningLocally(); |
| 68 | + |
| 69 | + public abstract boolean isRunningLocally(); |
146 | 70 |
|
147 | 71 | /**
|
148 | 72 | * Add a (Java friendly) listener to be executed on task completion.
|
149 | 73 | * This will be called in all situation - success, failure, or cancellation.
|
150 | 74 | * <p/>
|
151 | 75 | * An example use is for HadoopRDD to register a callback to close the input stream.
|
152 | 76 | */
|
153 |
| - public TaskContext addTaskCompletionListener(TaskCompletionListener listener) { |
154 |
| - onCompleteCallbacks.add(listener); |
155 |
| - return this; |
156 |
| - } |
| 77 | + public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener); |
157 | 78 |
|
158 | 79 | /**
|
159 | 80 | * Add a listener in the form of a Scala closure to be executed on task completion.
|
160 | 81 | * This will be called in all situations - success, failure, or cancellation.
|
161 | 82 | * <p/>
|
162 | 83 | * An example use is for HadoopRDD to register a callback to close the input stream.
|
163 | 84 | */
|
164 |
| - public TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f) { |
165 |
| - onCompleteCallbacks.add(new TaskCompletionListener() { |
166 |
| - @Override |
167 |
| - public void onTaskCompletion(TaskContext context) { |
168 |
| - f.apply(context); |
169 |
| - } |
170 |
| - }); |
171 |
| - return this; |
172 |
| - } |
| 85 | + public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f); |
173 | 86 |
|
174 | 87 | /**
|
175 | 88 | * Add a callback function to be executed on task completion. An example use
|
176 | 89 | * is for HadoopRDD to register a callback to close the input stream.
|
177 | 90 | * Will be called in any situation - success, failure, or cancellation.
|
178 | 91 | *
|
179 |
| - * Deprecated: use addTaskCompletionListener |
180 |
| - * |
| 92 | + * @deprecated: use addTaskCompletionListener |
| 93 | + * |
181 | 94 | * @param f Callback function.
|
182 | 95 | */
|
183 | 96 | @Deprecated
|
184 |
| - public void addOnCompleteCallback(final Function0<Unit> f) { |
185 |
| - onCompleteCallbacks.add(new TaskCompletionListener() { |
186 |
| - @Override |
187 |
| - public void onTaskCompletion(TaskContext context) { |
188 |
| - f.apply(); |
189 |
| - } |
190 |
| - }); |
191 |
| - } |
192 |
| - |
193 |
| - /** |
194 |
| - * ::Internal API:: |
195 |
| - * Marks the task as completed and triggers the listeners. |
196 |
| - */ |
197 |
| - public void markTaskCompleted() throws TaskCompletionListenerException { |
198 |
| - completed = true; |
199 |
| - List<String> errorMsgs = new ArrayList<String>(2); |
200 |
| - // Process complete callbacks in the reverse order of registration |
201 |
| - List<TaskCompletionListener> revlist = |
202 |
| - new ArrayList<TaskCompletionListener>(onCompleteCallbacks); |
203 |
| - Collections.reverse(revlist); |
204 |
| - for (TaskCompletionListener tcl: revlist) { |
205 |
| - try { |
206 |
| - tcl.onTaskCompletion(this); |
207 |
| - } catch (Throwable e) { |
208 |
| - errorMsgs.add(e.getMessage()); |
209 |
| - } |
210 |
| - } |
211 |
| - |
212 |
| - if (!errorMsgs.isEmpty()) { |
213 |
| - throw new TaskCompletionListenerException(JavaConversions.asScalaBuffer(errorMsgs)); |
214 |
| - } |
215 |
| - } |
216 |
| - |
217 |
| - /** |
218 |
| - * ::Internal API:: |
219 |
| - * Marks the task for interruption, i.e. cancellation. |
220 |
| - */ |
221 |
| - public void markInterrupted() { |
222 |
| - interrupted = true; |
223 |
| - } |
224 |
| - |
225 |
| - @Deprecated |
226 |
| - /** Deprecated: use getStageId() */ |
227 |
| - public int stageId() { |
228 |
| - return stageId; |
229 |
| - } |
230 |
| - |
231 |
| - @Deprecated |
232 |
| - /** Deprecated: use getPartitionId() */ |
233 |
| - public int partitionId() { |
234 |
| - return partitionId; |
235 |
| - } |
236 |
| - |
237 |
| - @Deprecated |
238 |
| - /** Deprecated: use getAttemptId() */ |
239 |
| - public long attemptId() { |
240 |
| - return attemptId; |
241 |
| - } |
242 |
| - |
243 |
| - @Deprecated |
244 |
| - /** Deprecated: use isRunningLocally() */ |
245 |
| - public boolean runningLocally() { |
246 |
| - return runningLocally; |
247 |
| - } |
248 |
| - |
249 |
| - public boolean isRunningLocally() { |
250 |
| - return runningLocally; |
251 |
| - } |
| 97 | + public abstract void addOnCompleteCallback(final Function0<Unit> f); |
252 | 98 |
|
253 |
| - public int getStageId() { |
254 |
| - return stageId; |
255 |
| - } |
| 99 | + public abstract int stageId(); |
256 | 100 |
|
257 |
| - public int getPartitionId() { |
258 |
| - return partitionId; |
259 |
| - } |
| 101 | + public abstract int partitionId(); |
260 | 102 |
|
261 |
| - public long getAttemptId() { |
262 |
| - return attemptId; |
263 |
| - } |
| 103 | + public abstract long attemptId(); |
264 | 104 |
|
265 |
| - /** ::Internal API:: */ |
266 |
| - public TaskMetrics taskMetrics() { |
267 |
| - return taskMetrics; |
268 |
| - } |
| 105 | + /** ::DeveloperApi:: */ |
| 106 | + @DeveloperApi |
| 107 | + public abstract TaskMetrics taskMetrics(); |
269 | 108 | }
|
0 commit comments