Skip to content

Commit 13f7b05

Browse files
author
Davies Liu
committed
Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
Conflicts: python/pyspark/rddsampler.py
2 parents f583023 + 7f22fa8 commit 13f7b05

File tree

195 files changed

+8470
-1994
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

195 files changed

+8470
-1994
lines changed

bin/pyspark

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then
132132
gatherSparkSubmitOpts "$@"
133133
exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
134134
else
135-
# PySpark shell requires special handling downstream
136-
export PYSPARK_SHELL=1
137135
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
138136
fi

bin/pyspark2.cmd

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
5959
)
6060

6161
if [%PYTHON_FILE%] == [] (
62-
set PYSPARK_SHELL=1
6362
if [%IPYTHON%] == [1] (
6463
ipython %IPYTHON_OPTS%
6564
) else (

bin/spark-submit

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
2323
ORIG_ARGS=("$@")
2424

25+
# Set COLUMNS for progress bar
26+
export COLUMNS=`tput cols`
27+
2528
while (($#)); do
2629
if [ "$1" = "--deploy-mode" ]; then
2730
SPARK_SUBMIT_DEPLOY_MODE=$2

conf/spark-env.sh.template

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
2929
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.
3030

31-
# Options for the daemons used in the standalone deploy mode:
31+
# Options for the daemons used in the standalone deploy mode
3232
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
3333
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
3434
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
@@ -41,3 +41,10 @@
4141
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
4242
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
4343
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
44+
45+
# Generic options for the daemons used in the standalone deploy mode
46+
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
47+
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
48+
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
49+
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
50+
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)

core/src/main/java/org/apache/spark/SparkStageInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
public interface SparkStageInfo {
2727
int stageId();
2828
int currentAttemptId();
29+
long submissionTime();
2930
String name();
3031
int numTasks();
3132
int numActiveTasks();

core/src/main/java/org/apache/spark/api/java/function/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ package org.apache.spark.api.java
2222
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
2323
* Java programming guide for more details.
2424
*/
25-
package object function
25+
package object function

core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ $(function() {
2626
// Switch the class of the arrow from open to closed.
2727
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
2828
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');
29-
30-
// If clicking caused the metrics to expand, automatically check all options for additional
31-
// metrics (don't trigger a click when collapsing metrics, because it leads to weird
32-
// toggling behavior).
33-
if (!$(additionalMetricsDiv).hasClass('collapsed')) {
34-
$(this).parent().find('input:checkbox:not(:checked)').trigger('click');
35-
}
3629
});
3730

3831
$("input:checkbox:not(:checked)").each(function() {
@@ -48,6 +41,16 @@ $(function() {
4841
stripeTables();
4942
});
5043

44+
$("#select-all-metrics").click(function() {
45+
if (this.checked) {
46+
// Toggle all un-checked options.
47+
$('input:checkbox:not(:checked)').trigger('click');
48+
} else {
49+
// Toggle all checked options.
50+
$('input:checkbox:checked').trigger('click');
51+
}
52+
});
53+
5154
// Trigger a click on the checkbox if a user clicks the label next to it.
5255
$("span.additional-metric-title").click(function() {
5356
$(this).parent().find('input:checkbox').trigger('click');

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
2828
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
2929
* persists for another M seconds, then more executors are added and so on. The number added
3030
* in each round increases exponentially from the previous round until an upper bound on the
31-
* number of executors has been reached.
31+
* number of executors has been reached. The upper bound is based both on a configured property
32+
* and on the number of tasks pending: the policy will never increase the number of executor
33+
* requests past the number needed to handle all pending tasks.
3234
*
3335
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
3436
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
8284
// During testing, the methods to actually kill and add executors are mocked out
8385
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
8486

87+
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
88+
// allocation is only supported for YARN and the default number of cores per executor in YARN is
89+
// 1, but it might need to be attained differently for different cluster managers
90+
private val tasksPerExecutor =
91+
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
92+
8593
validateSettings()
8694

8795
// Number of executors to add in the next round
@@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
110118
// Clock used to schedule when executors should be added and removed
111119
private var clock: Clock = new RealClock
112120

121+
// Listener for Spark events that impact the allocation policy
122+
private val listener = new ExecutorAllocationListener(this)
123+
113124
/**
114125
* Verify that the settings specified through the config are valid.
115126
* If not, throw an appropriate exception.
@@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
141152
throw new SparkException("Dynamic allocation of executors requires the external " +
142153
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
143154
}
155+
if (tasksPerExecutor == 0) {
156+
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
157+
}
144158
}
145159

146160
/**
@@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
154168
* Register for scheduler callbacks to decide when to add and remove executors.
155169
*/
156170
def start(): Unit = {
157-
val listener = new ExecutorAllocationListener(this)
158171
sc.addSparkListener(listener)
159172
startPolling()
160173
}
@@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
218231
return 0
219232
}
220233

221-
// Request executors with respect to the upper bound
222-
val actualNumExecutorsToAdd =
223-
if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
224-
numExecutorsToAdd
225-
} else {
226-
maxNumExecutors - numExistingExecutors
227-
}
234+
// The number of executors needed to satisfy all pending tasks is the number of tasks pending
235+
// divided by the number of tasks each executor can fit, rounded up.
236+
val maxNumExecutorsPending =
237+
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
238+
if (numExecutorsPending >= maxNumExecutorsPending) {
239+
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
240+
s"pending and pending tasks could only fill $maxNumExecutorsPending")
241+
numExecutorsToAdd = 1
242+
return 0
243+
}
244+
245+
// It's never useful to request more executors than could satisfy all the pending tasks, so
246+
// cap request at that amount.
247+
// Also cap request with respect to the configured upper bound.
248+
val maxNumExecutorsToAdd = math.min(
249+
maxNumExecutorsPending - numExecutorsPending,
250+
maxNumExecutors - numExistingExecutors)
251+
assert(maxNumExecutorsToAdd > 0)
252+
253+
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
254+
228255
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
229256
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
230257
if (addRequestAcknowledged) {
@@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
445472
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
446473
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
447474
}
475+
476+
/**
477+
* An estimate of the total number of pending tasks remaining for currently running stages. Does
478+
* not account for tasks which may have failed and been resubmitted.
479+
*/
480+
def totalPendingTasks(): Int = {
481+
stageIdToNumTasks.map { case (stageId, numTasks) =>
482+
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
483+
}.sum
484+
}
448485
}
449486

450487
}

0 commit comments

Comments
 (0)