Skip to content

Commit 48fd15d

Browse files
authored
Merge pull request apache#76 from fishcus/kyspark-ke-15774
[kap] apache#15774, clean broadcast after query
2 parents ae92053 + 3103afd commit 48fd15d

File tree

45 files changed

+145
-73
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+145
-73
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.11</artifactId>
24-
<version>2.2.1-kylin-r7</version>
24+
<version>2.2.1-kylin-r8</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

common/network-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-shuffle/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/network-yarn/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/sketch/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/tags/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

common/unsafe/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
2424
<artifactId>spark-parent_2.11</artifactId>
25-
<version>2.2.1-kylin-r7</version>
25+
<version>2.2.1-kylin-r8</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.11</artifactId>
24-
<version>2.2.1-kylin-r7</version>
24+
<version>2.2.1-kylin-r8</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import java.lang.ref.{ReferenceQueue, WeakReference}
2121
import java.util.Collections
22-
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
22+
import java.util.concurrent._
2323

2424
import scala.collection.JavaConverters._
2525

@@ -112,6 +112,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
112112
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
113113
"spark.cleaner.referenceTracking.blocking.shuffle", false)
114114

115+
private val cleanupTaskThreads = sc.conf.getInt(
116+
"spark.cleaner.referenceTracking.cleanupThreadNumber", 100)
117+
118+
private val cleanupExecutorPool: ExecutorService =
119+
ThreadUtils.newDaemonFixedThreadPool(cleanupTaskThreads, "cleanup")
120+
115121
@volatile private var stopped = false
116122

117123
/** Attach a listener object to get information of when objects are cleaned. */
@@ -178,34 +184,43 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
178184
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
179185
while (!stopped) {
180186
try {
181-
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
182-
.map(_.asInstanceOf[CleanupTaskWeakReference])
183-
// Synchronize here to avoid being interrupted on stop()
184-
synchronized {
185-
reference.foreach { ref =>
186-
logDebug("Got cleaning task " + ref.task)
187-
referenceBuffer.remove(ref)
188-
ref.task match {
189-
case CleanRDD(rddId) =>
190-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
191-
case CleanShuffle(shuffleId) =>
192-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
193-
case CleanBroadcast(broadcastId) =>
194-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
195-
case CleanAccum(accId) =>
196-
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
197-
case CleanCheckpoint(rddId) =>
198-
doCleanCheckpoint(rddId)
199-
}
200-
}
187+
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
188+
.map(_.asInstanceOf[CleanupTaskWeakReference]).foreach {
189+
r =>
190+
referenceBuffer.remove(r)
191+
runtCleanTask(r)
201192
}
202193
} catch {
203194
case ie: InterruptedException if stopped => // ignore
204-
case e: Exception => logError("Error in cleaning thread", e)
195+
case e: Exception => logError("Error in cleaning main thread", e)
205196
}
206197
}
207198
}
208199

200+
private def runtCleanTask(ref: CleanupTaskWeakReference) = {
201+
cleanupExecutorPool.submit(new Runnable {
202+
override def run(): Unit = {
203+
try {
204+
ref.task match {
205+
case CleanRDD(rddId) =>
206+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
207+
case CleanShuffle(shuffleId) =>
208+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
209+
case CleanBroadcast(broadcastId) =>
210+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
211+
case CleanAccum(accId) =>
212+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
213+
case CleanCheckpoint(rddId) =>
214+
doCleanCheckpoint(rddId)
215+
}
216+
} catch {
217+
case ie: InterruptedException if stopped => // ignore
218+
case e: Exception => logError("Error in cleaning thread", e)
219+
}
220+
}
221+
})
222+
}
223+
209224
/** Perform RDD cleanup. */
210225
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
211226
try {

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ private[spark] object MapOutputTracker extends Logging {
623623
if (arr.length >= minBroadcastSize) {
624624
// Use broadcast instead.
625625
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
626-
val bcast = broadcastManager.newBroadcast(arr, isLocal)
626+
val bcast = broadcastManager.newBroadcast(arr, isLocal, null)
627627
// toByteArray creates copy, so we can reuse out
628628
out.reset()
629629
out.write(BROADCAST)

0 commit comments

Comments
 (0)