Skip to content

Commit 5599d55

Browse files
committed
[RFC] Disable local execution of Spark jobs by default
Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead. Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring. This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or at least, reliance upon the feature) is hopefully minimal.
1 parent 9497b12 commit 5599d55

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class DAGScheduler(
121121

122122
private[scheduler] var eventProcessActor: ActorRef = _
123123

124+
/** If enabled, we may run certain actions like take() and first() locally. */
125+
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
126+
124127
private def initializeEventProcessActor() {
125128
// blocking the thread until supervisor is started, which ensures eventProcessActor is
126129
// not null before any job is submitted
@@ -732,7 +735,9 @@ class DAGScheduler(
732735
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
733736
logInfo("Parents of final stage: " + finalStage.parents)
734737
logInfo("Missing parents: " + getMissingParentStages(finalStage))
735-
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
738+
val shouldRunLocally =
739+
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
740+
if (shouldRunLocally) {
736741
// Compute very short actions like first() or take() with no parent stages locally.
737742
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
738743
runLocally(job)

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,15 @@ Apart from these, the following properties are also available, and may be useful
846846
(in milliseconds).
847847
</td>
848848
</tr>
849+
<tr>
850+
<td><code>spark.localExecution.enabled</code></td>
851+
<td>false</td>
852+
<td>
853+
Enables Spark to run certain jobs, such as first() or take() on the driver, without sending
854+
tasks to the cluster. This can make certain jobs execute very quickly, but may require
855+
shipping a whole partition of data to the driver.
856+
</td>
857+
</tr>
849858
</table>
850859

851860
#### Security

0 commit comments

Comments
 (0)