Skip to content

Commit 9bf7256

Browse files
7mming7chenzhx
authored andcommitted
performance issue when gc blocks
performance issue when gc blocks (apache#189) Co-authored-by: 7mming7 <[email protected]>
1 parent 8396226 commit 9bf7256

File tree

10 files changed

+52
-6
lines changed

10 files changed

+52
-6
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,7 @@ private[spark] object MapOutputTracker extends Logging {
13811381
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
13821382
// arr is a nested Array so that it can handle over 2GB serialized data
13831383
val arr = chunkedByteBuf.getChunks().map(_.array())
1384-
val bcast = broadcastManager.newBroadcast(arr, isLocal)
1384+
val bcast = broadcastManager.newBroadcast(arr, isLocal, null)
13851385
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
13861386
// This implementation doesn't reallocate the whole memory block but allocates
13871387
// additional buffers. This way no buffers need to be garbage collected and

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,10 +1512,13 @@ class SparkContext(config: SparkConf) extends Logging {
15121512
assertNotStopped()
15131513
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
15141514
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
1515-
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
1515+
val executionId = getLocalProperty("spark.sql.execution.id")
1516+
val bc = env.broadcastManager.newBroadcast[T](value, isLocal, executionId)
15161517
val callSite = getCallSite
15171518
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
1518-
cleaner.foreach(_.registerBroadcastForCleanup(bc))
1519+
if (executionId == null) {
1520+
cleaner.foreach(_.registerBroadcastForCleanup(bc))
1521+
}
15191522
bc
15201523
}
15211524

core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.util.Collections
21+
import java.util.concurrent.ConcurrentHashMap
2122
import java.util.concurrent.atomic.AtomicLong
2223

24+
import scala.collection.mutable.ListBuffer
2325
import scala.reflect.ClassTag
2426

2527
import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}
@@ -31,8 +33,11 @@ import org.apache.spark.internal.Logging
3133
private[spark] class BroadcastManager(
3234
val isDriver: Boolean, conf: SparkConf) extends Logging {
3335

36+
val cleanQueryBroadcast = conf.getBoolean("spark.broadcast.autoClean.enabled", false)
37+
3438
private var initialized = false
3539
private var broadcastFactory: BroadcastFactory = null
40+
var cachedBroadcast = new ConcurrentHashMap[String, ListBuffer[Long]]()
3641

3742
initialize()
3843

@@ -53,14 +58,33 @@ private[spark] class BroadcastManager(
5358

5459
private val nextBroadcastId = new AtomicLong(0)
5560

61+
private[spark] def currentBroadcastId: Long = nextBroadcastId.get()
62+
5663
private[broadcast] val cachedValues =
5764
Collections.synchronizedMap(
5865
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
5966
.asInstanceOf[java.util.Map[Any, Any]]
6067
)
6168

62-
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
69+
def cleanBroadCast(executionId: String): Unit = {
70+
if (cachedBroadcast.containsKey(executionId)) {
71+
cachedBroadcast.get(executionId)
72+
.foreach(broadcastId => unbroadcast(broadcastId, true, false))
73+
cachedBroadcast.remove(executionId)
74+
}
75+
}
76+
77+
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, executionId: String): Broadcast[T] = {
6378
val bid = nextBroadcastId.getAndIncrement()
79+
if (executionId != null && cleanQueryBroadcast) {
80+
if (cachedBroadcast.containsKey(executionId)) {
81+
cachedBroadcast.get(executionId) += bid
82+
} else {
83+
val list = new scala.collection.mutable.ListBuffer[Long]
84+
list += bid
85+
cachedBroadcast.put(executionId, list)
86+
}
87+
}
6488
value_ match {
6589
case pb: PythonBroadcast =>
6690
// SPARK-28486: attach this new broadcast variable's id to the PythonBroadcast,

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,8 @@ private[spark] class DAGScheduler(
854854
return new JobWaiter[U](this, jobId, 0, resultHandler)
855855
}
856856

857+
val executionId = sc.getLocalProperty("spark.sql.execution.id")
858+
logInfo(s"submit job : $jobId, executionId is $executionId")
857859
assert(partitions.nonEmpty)
858860
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
859861
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
@@ -1317,6 +1319,7 @@ private[spark] class DAGScheduler(
13171319
/** Called when stage's parents are available and we can now do its task. */
13181320
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
13191321
logDebug("submitMissingTasks(" + stage + ")")
1322+
logInfo(s"submit stage ${stage.id} with jobId: $jobId")
13201323

13211324
// Before find missing partition, do the intermediate state clean work first.
13221325
// The operation here can make sure for the partially completed intermediate stage,

core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.spark.rdd.RDD
5454
private[spark] class ResultTask[T, U](
5555
stageId: Int,
5656
stageAttemptId: Int,
57-
taskBinary: Broadcast[Array[Byte]],
57+
val taskBinary: Broadcast[Array[Byte]],
5858
partition: Partition,
5959
locs: Seq[TaskLocation],
6060
val outputId: Int,

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import org.apache.spark.rdd.RDD
5252
private[spark] class ShuffleMapTask(
5353
stageId: Int,
5454
stageAttemptId: Int,
55-
taskBinary: Broadcast[Array[Byte]],
55+
val taskBinary: Broadcast[Array[Byte]],
5656
partition: Partition,
5757
@transient private var locs: Seq[TaskLocation],
5858
localProperties: Properties,

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,13 @@ private[spark] class TaskSetManager(
564564
if (isZombie && runningTasks == 0) {
565565
sched.taskSetFinished(this)
566566
if (tasksSuccessful == numTasks) {
567+
val broadcastId = taskSet.tasks.head match {
568+
case resultTask: ResultTask[Any, Any] =>
569+
resultTask.taskBinary.id
570+
case shuffleMapTask: ShuffleMapTask =>
571+
shuffleMapTask.taskBinary.id
572+
}
573+
SparkEnv.get.broadcastManager.unbroadcast(broadcastId, true, false)
567574
healthTracker.foreach(_.updateExcludedForSuccessfulTaskSet(
568575
taskSet.stageId,
569576
taskSet.stageAttemptId,

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ class BlockManagerMasterEndpoint(
340340
}
341341
}.toSeq
342342

343+
val blocksToRemove = blockLocations.keySet().asScala
344+
.collect {
345+
case broadcastId@BroadcastBlockId(`broadcastId`, _) =>
346+
broadcastId
347+
}
348+
blocksToRemove.foreach(blockLocations.remove)
343349
Future.sequence(futures)
344350
}
345351

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<module>common/kvstore</module>
8787
<module>common/network-common</module>
8888
<module>common/network-shuffle</module>
89+
<module>common/network-yarn</module>
8990
<module>common/unsafe</module>
9091
<module>common/tags</module>
9192
<module>core</module>

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFutu
2121
import java.util.concurrent.atomic.AtomicLong
2222

2323
import org.apache.spark.SparkContext
24+
import org.apache.spark.SparkEnv
2425
import org.apache.spark.internal.config.Tests.IS_TESTING
2526
import org.apache.spark.sql.SparkSession
2627
import org.apache.spark.sql.execution.ui.{PostQueryExecutionForKylin, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
@@ -127,6 +128,7 @@ object SQLExecution {
127128
} finally {
128129
executionIdToQueryExecution.remove(executionId)
129130
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
131+
SparkEnv.get.broadcastManager.cleanBroadCast(executionId.toString)
130132
}
131133
}
132134

0 commit comments

Comments
 (0)