Skip to content

Commit 8baf3ba

Browse files
committed
[SPARK-26660][FOLLOWUP] Add warning logs when broadcasting large task binary
## What changes were proposed in this pull request? The warning introduced in #23580 has a bug: #23580 (comment) This just fixes the logic. ## How was this patch tested? N/A Closes #23668 from srowen/SPARK-26660.2. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent dfed439 commit 8baf3ba

File tree

3 files changed

+6
-6
lines changed

3 files changed

+6
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1162,7 +1162,7 @@ private[spark] class DAGScheduler(
11621162
partitions = stage.rdd.partitions
11631163
}
11641164

1165-
if (taskBinaryBytes.length * 1000 > TaskSetManager.TASK_SIZE_TO_WARN_KB) {
1165+
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
11661166
logWarning(s"Broadcasting large task binary with size " +
11671167
s"${Utils.bytesToString(taskBinaryBytes.length)}")
11681168
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,12 +494,12 @@ private[spark] class TaskSetManager(
494494
abort(s"$msg Exception during serialization: $e")
495495
throw new TaskNotSerializableException(e)
496496
}
497-
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
497+
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024 &&
498498
!emittedTaskSizeWarning) {
499499
emittedTaskSizeWarning = true
500500
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
501-
s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
502-
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
501+
s"(${serializedTask.limit() / 1024} KiB). The maximum recommended task size is " +
502+
s"${TaskSetManager.TASK_SIZE_TO_WARN_KIB} KiB.")
503503
}
504504
addRunningTask(taskId)
505505

@@ -1101,5 +1101,5 @@ private[spark] class TaskSetManager(
11011101
private[spark] object TaskSetManager {
11021102
// The user will be warned if any stages contain a task that has a serialized size greater than
11031103
// this.
1104-
val TASK_SIZE_TO_WARN_KB = 100
1104+
val TASK_SIZE_TO_WARN_KIB = 100
11051105
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
153153
*/
154154
class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
155155

156-
val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
156+
val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024)
157157
val random = new Random(0)
158158
random.nextBytes(randomBuffer)
159159

0 commit comments

Comments
 (0)