Skip to content

Commit 8d0dea0

Browse files
wangshisanLantaoJin
authored andcommitted
[CARMEL-640] add max memory limit for result tasks (#45)
1 parent 7c460f1 commit 8d0dea0

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,10 @@ private[spark] case class ExecutorDeadException(message: String)
5050
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
5151
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
5252
s" $version: $message", cause)
53+
54+
/**
55+
* A poison pill exception, once get a such exception, the scheduler will abort the whole stage
56+
* the task belongs to.
57+
*/
58+
private[spark] case class PoisonPillException(message: String, cause: Throwable = null)
59+
extends SparkException(message, cause)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2604,6 +2604,14 @@ object SQLConf {
26042604
.checkValue(_ > 0, "The timeout value must be positive")
26052605
.createWithDefault(10L)
26062606

2607+
val TASK_MAX_RETURN_SIZE =
2608+
buildConf("spark.sql.task.return.size.limit")
2609+
.doc("This is a limit for result tasks (tasks in the final stage of a query execution)." +
2610+
" To protect executor from OOM, tasks whose result size over this limit will exist" +
2611+
" with exception immediately.")
2612+
.intConf
2613+
.createWithDefault(1 << 30)
2614+
26072615
val BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT =
26082616
buildConf("spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit")
26092617
.internal()
@@ -3258,6 +3266,8 @@ class SQLConf extends Serializable with Logging {
32583266
def maxNumberForTemporaryTablesPerSession: Long =
32593267
getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION)
32603268

3269+
def maxTaskResultSize: Int = getConf(TASK_MAX_RETURN_SIZE)
3270+
32613271
def shuffleAdjustPartitionNumThresholdMin: Double =
32623272
getConf(SHUFFLE_ADJUST_PARTITION_NUM_THRESHOLD_MIN)
32633273

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2424

25-
import org.apache.spark.{broadcast, SparkEnv}
25+
import org.apache.spark.{broadcast, PoisonPillException, SparkEnv}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.io.CompressionCodec
2828
import org.apache.spark.rdd.{RDD, RDDOperationScope}
@@ -313,6 +313,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
313313
*/
314314
private def getByteArrayRdd(
315315
n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = {
316+
val sizeLimit = sqlContext.conf.maxTaskResultSize
316317
execute().mapPartitionsInternal { iter =>
317318
var count = 0
318319
val buffer = new Array[Byte](4 << 10) // 4K
@@ -329,6 +330,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
329330
var i = 0
330331
count = last.length
331332
while (i < count) {
333+
if (bos.size() > sizeLimit) {
334+
throw PoisonPillException(
335+
s"Returned too many result, please consider to add a limit clause!" +
336+
s" Result size for each task should be under $sizeLimit bytes.")
337+
}
332338
val row = last(i)
333339
out.writeInt(row.getSizeInBytes)
334340
row.writeToStream(out, buffer)
@@ -338,6 +344,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
338344
// `iter.hasNext` may produce one row and buffer it, we should only call it when the
339345
// limit is not hit.
340346
while ((n < 0 || count < n) && iter.hasNext) {
347+
if (bos.size() > sizeLimit) {
348+
throw PoisonPillException(
349+
s"Returned too many result, please consider to add a limit clause!" +
350+
s" Result size for each task should be under $sizeLimit bytes.")
351+
}
341352
val row = iter.next().asInstanceOf[UnsafeRow]
342353
out.writeInt(row.getSizeInBytes)
343354
row.writeToStream(out, buffer)

0 commit comments

Comments
 (0)