Skip to content

Commit d9f56b9

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6030][FollowUp] Fix OutOfMemoryError-original fix may cause task thread stuck (#986)
1 parent b06e3a1 commit d9f56b9

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class FileScanRDD(
8686
protected val fileScanMaxThreadNum = sparkSession.sessionState.conf.fileScanMaxThreadNum
8787
protected val maxInConsumingObjNum = sparkSession.sessionState.conf.parquetColumnBatchNum
8888
@volatile protected var usingMultiThread = false
89+
@transient protected var fileScanExecutor: Option[ThreadPoolExecutor] = None
8990

9091
protected val indexEnabled = sparkSession.sessionState.conf.indexEnabled
9192

@@ -373,20 +374,20 @@ class FileScanRDD(
373374
if (fileNum <= maxThreadNum) {
374375
workersNum = fileNum
375376
}
376-
377-
private val executor = getExecutor
377+
if (fileScanExecutor == null || fileScanExecutor.isEmpty) {
378+
fileScanExecutor = Some(createExecutor)
379+
}
380+
private val executor = fileScanExecutor.get
378381
private val notCompleteFiles = new AtomicInteger(fileNum)
379382

380383
@volatile private var scanException : Exception = null
381384
import scala.collection.JavaConverters._
382385
private val workingInputStatsMap = new ConcurrentHashMap[String, (Long, Long)].asScala
383386
private val completedInputStats = (new LongAccumulator, new LongAccumulator)
384387

385-
private def getExecutor: ThreadPoolExecutor = {
386-
context.getOrCreateObjFromEnv("fileScanExecutor",
387-
{ThreadUtils.newDaemonFixedThreadPool(workersNum,
388-
s"FileScanForPartition ${context.partitionId()}")})
389-
.asInstanceOf[ThreadPoolExecutor]
388+
private def createExecutor: ThreadPoolExecutor = {
389+
ThreadUtils.newDaemonFixedThreadPool(workersNum,
390+
s"FileScanForPartition ${context.partitionId()}")
390391
}
391392

392393
// start to submit scan task

0 commit comments

Comments
 (0)