Skip to content

Commit acd4ac7

Browse files
sryzatgravescs
authored andcommitted
SPARK-3837. Warn when YARN kills containers for exceeding memory limits
I triggered the issue and verified the message gets printed on a pseudo-distributed cluster. Author: Sandy Ryza <[email protected]> Closes #2744 from sryza/sandy-spark-3837 and squashes the following commits: 858a268 [Sandy Ryza] Review feedback c937f00 [Sandy Ryza] SPARK-3837. Warn when YARN kills containers for exceeding memory limits
1 parent 58a6077 commit acd4ac7

File tree

2 files changed

+61
-3
lines changed

2 files changed

+61
-3
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
2020
import java.util.{List => JList}
2121
import java.util.concurrent._
2222
import java.util.concurrent.atomic.AtomicInteger
23+
import java.util.regex.Pattern
2324

2425
import scala.collection.JavaConversions._
2526
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -375,12 +376,22 @@ private[yarn] abstract class YarnAllocator(
375376
logInfo("Completed container %s (state: %s, exit status: %s)".format(
376377
containerId,
377378
completedContainer.getState,
378-
completedContainer.getExitStatus()))
379+
completedContainer.getExitStatus))
379380
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
380381
// there are some exit status' we shouldn't necessarily count against us, but for
381382
// now I think its ok as none of the containers are expected to exit
382-
if (completedContainer.getExitStatus() != 0) {
383-
logInfo("Container marked as failed: " + containerId)
383+
if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
384+
logWarning(memLimitExceededLogMessage(
385+
completedContainer.getDiagnostics,
386+
VMEM_EXCEEDED_PATTERN))
387+
} else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
388+
logWarning(memLimitExceededLogMessage(
389+
completedContainer.getDiagnostics,
390+
PMEM_EXCEEDED_PATTERN))
391+
} else if (completedContainer.getExitStatus != 0) {
392+
logInfo("Container marked as failed: " + containerId +
393+
". Exit status: " + completedContainer.getExitStatus +
394+
". Diagnostics: " + completedContainer.getDiagnostics)
384395
numExecutorsFailed.incrementAndGet()
385396
}
386397
}
@@ -428,6 +439,19 @@ private[yarn] abstract class YarnAllocator(
428439
}
429440
}
430441

442+
private val MEM_REGEX = "[0-9.]+ [KMG]B"
443+
private val PMEM_EXCEEDED_PATTERN =
444+
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
445+
private val VMEM_EXCEEDED_PATTERN =
446+
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
447+
448+
def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
449+
val matcher = pattern.matcher(diagnostics)
450+
val diag = if (matcher.find()) " " + matcher.group() + "." else ""
451+
("Container killed by YARN for exceeding memory limits." + diag
452+
+ " Consider boosting spark.yarn.executor.memoryOverhead.")
453+
}
454+
431455
protected def allocatedContainersOnHost(host: String): Int = {
432456
var retval = 0
433457
allocatedHostToContainersMap.synchronized {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import org.apache.spark.deploy.yarn.MemLimitLogger._
21+
import org.scalatest.FunSuite
22+
23+
class YarnAllocatorSuite extends FunSuite {
24+
test("memory exceeded diagnostic regexes") {
25+
val diagnostics =
26+
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
27+
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
28+
"5.8 GB of 4.2 GB virtual memory used. Killing container."
29+
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
30+
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
31+
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
32+
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
33+
}
34+
}

0 commit comments

Comments
 (0)