@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
20
20
import java .util .{List => JList }
21
21
import java .util .concurrent ._
22
22
import java .util .concurrent .atomic .AtomicInteger
23
+ import java .util .regex .Pattern
23
24
24
25
import scala .collection .JavaConversions ._
25
26
import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet }
@@ -330,12 +331,21 @@ private[yarn] abstract class YarnAllocator(
330
331
logInfo(" Completed container %s (state: %s, exit status: %s)" .format(
331
332
containerId,
332
333
completedContainer.getState,
333
- completedContainer.getExitStatus() ))
334
+ completedContainer.getExitStatus))
334
335
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
335
336
// there are some exit status' we shouldn't necessarily count against us, but for
336
337
// now I think its ok as none of the containers are expected to exit
337
- if (completedContainer.getExitStatus() != 0 ) {
338
- logInfo(" Container marked as failed: " + containerId)
338
+ if (completedContainer.getExitStatus == - 103 ) { // vmem limit exceeded
339
+ logWarning(MemLimitLogger .memLimitExceededLogMessage(
340
+ completedContainer.getDiagnostics,
341
+ MemLimitLogger .VMEM_EXCEEDED_PATTERN ))
342
+ } else if (completedContainer.getExitStatus == - 104 ) { // pmem limit exceeded
343
+ logWarning(MemLimitLogger .memLimitExceededLogMessage(
344
+ completedContainer.getDiagnostics,
345
+ MemLimitLogger .PMEM_EXCEEDED_PATTERN ))
346
+ } else if (completedContainer.getExitStatus != 0 ) {
347
+ logInfo(" Container marked as failed: " + containerId + " . Exit status: " +
348
+ completedContainer.getExitStatus)
339
349
numExecutorsFailed.incrementAndGet()
340
350
}
341
351
}
@@ -463,3 +473,19 @@ private[yarn] abstract class YarnAllocator(
463
473
}
464
474
465
475
}
476
+
477
+ private [yarn] object MemLimitLogger {
478
+ private val MEM_REGEX = " [0-9.]+ [KMG]B"
479
+ val PMEM_EXCEEDED_PATTERN =
480
+ Pattern .compile(s " $MEM_REGEX of $MEM_REGEX physical memory used " )
481
+ val VMEM_EXCEEDED_PATTERN =
482
+ Pattern .compile(s " $MEM_REGEX of $MEM_REGEX virtual memory used " )
483
+
484
+ def memLimitExceededLogMessage (diagnostics : String , pattern : Pattern ): String = {
485
+ val matcher = pattern.matcher(diagnostics)
486
+ val diag = if (matcher.find()) " " + matcher.group() + " ." else " "
487
+ (" Container killed by YARN for exceeding memory limits." + diag
488
+ + " Consider boosting spark.yarn.executor.memoryOverhead." )
489
+ }
490
+
491
+ }
0 commit comments