Skip to content

Commit 1a43bcd

Browse files
dongjoon-hyunGitHub Enterprise
authored andcommitted
[SPARK-37819][K8S] Add OUTLIER executor roll policy and use it by default (apache#1323)
### What changes were proposed in this pull request? This PR aims to add a new executor roll policy, `OUTLIER`, which aims to detect various outliers first, and use it by default. If there is no outlier, it will work like `TOTAL_DURATION` policy. ### Why are the changes needed? The users can use `OUTLIER` policy to consider the outliers in terms of multiple dimensions. In addition, this will be a better default policy. ### Does this PR introduce _any_ user-facing change? No. This is a new feature in Apache Spark 3.3. ### How was this patch tested? Pass the CIs with the newly added test cases.
1 parent 48733a5 commit 1a43bcd

File tree

3 files changed

+106
-12
lines changed

3 files changed

+106
-12
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,26 +147,30 @@ private[spark] object Config extends Logging {
147147
.createWithDefault(0)
148148

149149
object ExecutorRollPolicy extends Enumeration {
150-
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS = Value
150+
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, OUTLIER = Value
151151
}
152152

153153
val EXECUTOR_ROLL_POLICY =
154154
ConfigBuilder("spark.kubernetes.executor.rollPolicy")
155-
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME (default), " +
156-
"TOTAL_DURATION, and FAILED_TASKS. " +
155+
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME, " +
156+
"TOTAL_DURATION, FAILED_TASKS, and OUTLIER (default). " +
157157
"When executor roll happens, Spark uses this policy to choose " +
158158
"an executor and decommission it. The built-in policies are based on executor summary." +
159159
"ID policy chooses an executor with the smallest executor ID. " +
160160
"ADD_TIME policy chooses an executor with the smallest add-time. " +
161161
"TOTAL_GC_TIME policy chooses an executor with the biggest total task GC time. " +
162162
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
163163
"AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
164-
"FAILED_TASKS policy chooses an executor with the most number of failed tasks.")
164+
"FAILED_TASKS policy chooses an executor with the most number of failed tasks. " +
165+
"OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
166+
"at least two standard deviation from the mean in average task time, " +
167+
"total task time, total task GC time, and the number of failed tasks if exists. " +
168+
"If there is no outlier, it works like TOTAL_DURATION policy.")
165169
.version("3.3.0")
166170
.stringConf
167171
.transform(_.toUpperCase(Locale.ROOT))
168172
.checkValues(ExecutorRollPolicy.values.map(_.toString))
169-
.createWithDefault(ExecutorRollPolicy.TOTAL_GC_TIME.toString)
173+
.createWithDefault(ExecutorRollPolicy.OUTLIER.toString)
170174

171175
val MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING =
172176
ConfigBuilder("spark.kubernetes.executor.minTasksPerExecutorBeforeRolling")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.k8s
1818

19+
import java.lang.Math.sqrt
1920
import java.util.{Map => JMap}
2021
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
2122

@@ -116,7 +117,42 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
116117
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
117118
case ExecutorRollPolicy.FAILED_TASKS =>
118119
listWithoutDriver.sortBy(_.failedTasks).reverse
120+
case ExecutorRollPolicy.OUTLIER =>
121+
// We build multiple outlier lists and concat in the following importance order to find
122+
// outliers in various perspective:
123+
// AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS
124+
// Since we will choose only first item, the duplication is okay. If there is no outlier,
125+
// We fallback to TOTAL_DURATION policy.
126+
outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration / e.totalTasks) ++
127+
outliers(listWithoutDriver, e => e.totalDuration) ++
128+
outliers(listWithoutDriver, e => e.totalGCTime) ++
129+
outliers(listWithoutDriver, e => e.failedTasks) ++
130+
listWithoutDriver.sortBy(_.totalDuration).reverse
119131
}
120132
sortedList.headOption.map(_.id)
121133
}
134+
135+
/**
136+
* Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is
137+
* a best-effort approach because the snapshot of ExecutorSummary is not a normal distribution.
138+
* Outliers can be defined in several ways (https://en.wikipedia.org/wiki/Outlier).
139+
* Here, we borrowed 2-sigma idea from https://en.wikipedia.org/wiki/68-95-99.7_rule.
140+
* In case of normal distribution, this is known to be 2.5 percent roughly.
141+
*/
142+
private def outliers(
143+
list: Seq[v1.ExecutorSummary],
144+
get: v1.ExecutorSummary => Float): Seq[v1.ExecutorSummary] = {
145+
if (list.isEmpty) {
146+
list
147+
} else {
148+
val size = list.size
149+
val values = list.map(get)
150+
val mean = values.sum / size
151+
val sd = sqrt(values.map(v => (v - mean) * (v - mean)).sum / size)
152+
list
153+
.filter(e => (get(e) - mean) > 2 * sd)
154+
.sortBy(e => get(e))
155+
.reverse
156+
}
157+
}
122158
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
4242
val execWithSmallestID = new ExecutorSummary("1", "host:port", true, 1,
4343
10, 10, 1, 1, 1,
4444
0, 0, 1, 100,
45-
1, 100, 100,
45+
20, 100, 100,
4646
10, false, 20, new Date(1639300001000L),
4747
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
4848
false, Set())
@@ -51,7 +51,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
5151
val execWithSmallestAddTime = new ExecutorSummary("2", "host:port", true, 1,
5252
10, 10, 1, 1, 1,
5353
0, 0, 1, 100,
54-
1, 100, 100,
54+
20, 100, 100,
5555
10, false, 20, new Date(1639300000000L),
5656
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
5757
false, Set())
@@ -60,7 +60,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
6060
val execWithBiggestTotalGCTime = new ExecutorSummary("3", "host:port", true, 1,
6161
10, 10, 1, 1, 1,
6262
0, 0, 1, 100,
63-
4, 100, 100,
63+
40, 100, 100,
6464
10, false, 20, new Date(1639300002000L),
6565
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
6666
false, Set())
@@ -69,7 +69,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
6969
val execWithBiggestTotalDuration = new ExecutorSummary("4", "host:port", true, 1,
7070
10, 10, 1, 1, 1,
7171
0, 0, 4, 400,
72-
1, 100, 100,
72+
20, 100, 100,
7373
10, false, 20, new Date(1639300003000L),
7474
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
7575
false, Set())
@@ -78,7 +78,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
7878
val execWithBiggestFailedTasks = new ExecutorSummary("5", "host:port", true, 1,
7979
10, 10, 1, 1, 1,
8080
5, 0, 1, 100,
81-
1, 100, 100,
81+
20, 100, 100,
8282
10, false, 20, new Date(1639300003000L),
8383
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
8484
false, Set())
@@ -87,7 +87,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
8787
val execWithBiggestAverageDuration = new ExecutorSummary("6", "host:port", true, 1,
8888
10, 10, 1, 1, 1,
8989
0, 0, 2, 300,
90-
1, 100, 100,
90+
20, 100, 100,
9191
10, false, 20, new Date(1639300003000L),
9292
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
9393
false, Set())
@@ -101,9 +101,18 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
101101
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
102102
false, Set())
103103

104+
// This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
105+
val execNormal = new ExecutorSummary("8", "host:port", true, 1,
106+
10, 10, 1, 1, 1,
107+
4, 0, 2, 280,
108+
30, 100, 100,
109+
10, false, 20, new Date(1639300001000L),
110+
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
111+
false, Set())
112+
104113
val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
105114
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
106-
execWithBiggestAverageDuration, execWithoutTasks)
115+
execWithBiggestAverageDuration, execWithoutTasks, execNormal)
107116

108117
override def beforeEach(): Unit = {
109118
super.beforeEach()
@@ -162,4 +171,49 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
162171
Some("6"),
163172
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)))
164173
}
174+
175+
test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
176+
assertEquals(
177+
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)),
178+
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.OUTLIER)))
179+
}
180+
181+
test("Policy: OUTLIER - Detect an average task duration outlier") {
182+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
183+
0, 0, 1, 0, 0,
184+
3, 0, 1, 300,
185+
20, 0, 0,
186+
0, false, 0, new Date(1639300001000L),
187+
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
188+
false, Set())
189+
assertEquals(
190+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.AVERAGE_DURATION)),
191+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
192+
}
193+
194+
test("Policy: OUTLIER - Detect a total task duration outlier") {
195+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
196+
0, 0, 1, 0, 0,
197+
3, 0, 1000, 1000,
198+
0, 0, 0,
199+
0, false, 0, new Date(1639300001000L),
200+
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
201+
false, Set())
202+
assertEquals(
203+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_DURATION)),
204+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
205+
}
206+
207+
test("Policy: OUTLIER - Detect a total GC time outlier") {
208+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
209+
0, 0, 1, 0, 0,
210+
3, 0, 1, 100,
211+
1000, 0, 0,
212+
0, false, 0, new Date(1639300001000L),
213+
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
214+
false, Set())
215+
assertEquals(
216+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME)),
217+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
218+
}
165219
}

0 commit comments

Comments
 (0)