Skip to content

Commit 1290164

Browse files
rxinmateiz
authored andcommitted
[SPARK-2704] Name threads in ConnectionManager and mark them as daemon.
handleMessageExecutor, handleReadWriteExecutor, and handleConnectExecutor are not marked as daemon and not named. I think there exists some condition in which Spark programs won't terminate because of this. Stack dump attached in https://issues.apache.org/jira/browse/SPARK-2704 Author: Reynold Xin <[email protected]> Closes #1604 from rxin/daemon and squashes the following commits: 98d6a6c [Reynold Xin] [SPARK-2704] Name threads in ConnectionManager and mark them as daemon.
1 parent c183b92 commit 1290164

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,24 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
6262
conf.getInt("spark.core.connection.handler.threads.min", 20),
6363
conf.getInt("spark.core.connection.handler.threads.max", 60),
6464
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
65-
new LinkedBlockingDeque[Runnable]())
65+
new LinkedBlockingDeque[Runnable](),
66+
Utils.namedThreadFactory("handle-message-executor"))
6667

6768
private val handleReadWriteExecutor = new ThreadPoolExecutor(
6869
conf.getInt("spark.core.connection.io.threads.min", 4),
6970
conf.getInt("spark.core.connection.io.threads.max", 32),
7071
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
71-
new LinkedBlockingDeque[Runnable]())
72+
new LinkedBlockingDeque[Runnable](),
73+
Utils.namedThreadFactory("handle-read-write-executor"))
7274

7375
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
7476
// which should be executed asap
7577
private val handleConnectExecutor = new ThreadPoolExecutor(
7678
conf.getInt("spark.core.connection.connect.threads.min", 1),
7779
conf.getInt("spark.core.connection.connect.threads.max", 8),
7880
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
79-
new LinkedBlockingDeque[Runnable]())
81+
new LinkedBlockingDeque[Runnable](),
82+
Utils.namedThreadFactory("handle-connect-executor"))
8083

8184
private val serverChannel = ServerSocketChannel.open()
8285
// used to track the SendingConnections waiting to do SASL negotiation

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
2222
import java.nio.ByteBuffer
2323
import java.util.{Locale, Random, UUID}
24-
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
24+
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
2525

2626
import scala.collection.JavaConversions._
2727
import scala.collection.Map
@@ -553,30 +553,37 @@ private[spark] object Utils extends Logging {
553553
new ThreadFactoryBuilder().setDaemon(true)
554554

555555
/**
556-
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
557-
* unique, sequentially assigned integer.
556+
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
558557
*/
559-
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
560-
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
561-
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
558+
def namedThreadFactory(prefix: String): ThreadFactory = {
559+
daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
562560
}
563561

564562
/**
565-
* Return the string to tell how long has passed in milliseconds.
563+
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
564+
* unique, sequentially assigned integer.
566565
*/
567-
def getUsedTimeMs(startTimeMs: Long): String = {
568-
" " + (System.currentTimeMillis - startTimeMs) + " ms"
566+
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
567+
val threadFactory = namedThreadFactory(prefix)
568+
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
569569
}
570570

571571
/**
572572
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
573573
* unique, sequentially assigned integer.
574574
*/
575575
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
576-
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
576+
val threadFactory = namedThreadFactory(prefix)
577577
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
578578
}
579579

580+
/**
581+
* Return the string to tell how long has passed in milliseconds.
582+
*/
583+
def getUsedTimeMs(startTimeMs: Long): String = {
584+
" " + (System.currentTimeMillis - startTimeMs) + " ms"
585+
}
586+
580587
private def listFilesSafely(file: File): Seq[File] = {
581588
val files = file.listFiles()
582589
if (files == null) {

0 commit comments

Comments
 (0)