Skip to content

Commit d74fd8f

Browse files
committed
Merge remote-tracking branch 'upstream/master' into ldaRefactor
2 parents 0bb8400 + a7d65d3 commit d74fd8f

File tree

91 files changed

+616
-439
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+616
-439
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4444
blockManager.get(key) match {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
47-
val inputMetrics = blockResult.inputMetrics
4847
val existingMetrics = context.taskMetrics
49-
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50-
existingMetrics.incBytesRead(inputMetrics.bytesRead)
48+
.getInputMetricsForReadMethod(blockResult.readMethod)
49+
existingMetrics.incBytesRead(blockResult.bytes)
5150

5251
val iter = blockResult.data.asInstanceOf[Iterator[T]]
5352
new InterruptibleIterator[T](context, iter) {

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.{Executors, TimeUnit}
20+
import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
2323

2424
import org.apache.spark.scheduler._
25-
import org.apache.spark.util.{Clock, SystemClock, Utils}
25+
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
2626

2727
/**
2828
* An agent that dynamically allocates and removes executors based on the workload.
@@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
132132
private val listener = new ExecutorAllocationListener
133133

134134
// Executor that handles the scheduling task.
135-
private val executor = Executors.newSingleThreadScheduledExecutor(
136-
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
135+
private val executor =
136+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
137137

138138
/**
139139
* Verify that the settings specified through the config are valid.

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
20+
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2121

2222
import scala.collection.mutable
2323

2424
import org.apache.spark.executor.TaskMetrics
2525
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
2626
import org.apache.spark.storage.BlockManagerId
2727
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
28-
import org.apache.spark.util.Utils
28+
import org.apache.spark.util.{ThreadUtils, Utils}
2929

3030
/**
3131
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
7676

7777
private var timeoutCheckingTask: ScheduledFuture[_] = null
7878

79-
private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
80-
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
79+
private val timeoutCheckingThread =
80+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
8181

82-
private val killExecutorThread = Executors.newSingleThreadExecutor(
83-
Utils.namedThreadFactory("kill-executor-thread"))
82+
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
8483

8584
override def onStart(): Unit = {
8685
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
3232
import org.apache.spark.io.CompressionCodec
3333
import org.apache.spark.scheduler._
3434
import org.apache.spark.ui.SparkUI
35-
import org.apache.spark.util.Utils
35+
import org.apache.spark.util.{ThreadUtils, Utils}
3636
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3737

3838

@@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
9999
*/
100100
private val replayExecutor: ExecutorService = {
101101
if (!conf.contains("spark.testing")) {
102-
Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
102+
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
103103
} else {
104104
MoreExecutors.sameThreadExecutor()
105105
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import java.lang.management.ManagementFactory
2222
import java.net.URL
2323
import java.nio.ByteBuffer
24-
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
24+
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2525

2626
import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -76,7 +76,7 @@ private[spark] class Executor(
7676
}
7777

7878
// Start worker thread pool
79-
private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
79+
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
8080
private val executorSource = new ExecutorSource(threadPool, executorId)
8181

8282
if (!isLocal) {
@@ -110,8 +110,7 @@ private[spark] class Executor(
110110
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
111111

112112
// Executor for the heartbeat task.
113-
private val heartbeater = Executors.newSingleThreadScheduledExecutor(
114-
Utils.namedThreadFactory("driver-heartbeater"))
113+
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
115114

116115
startDriverHeartbeater()
117116

core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
3636

3737
import org.apache.spark._
3838
import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
39-
import org.apache.spark.util.Utils
39+
import org.apache.spark.util.{ThreadUtils, Utils}
4040

4141
import scala.util.Try
4242
import scala.util.control.NonFatal
@@ -79,7 +79,7 @@ private[nio] class ConnectionManager(
7979

8080
private val selector = SelectorProvider.provider.openSelector()
8181
private val ackTimeoutMonitor =
82-
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
82+
new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))
8383

8484
private val ackTimeout =
8585
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
@@ -102,7 +102,7 @@ private[nio] class ConnectionManager(
102102
handlerThreadCount,
103103
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
104104
new LinkedBlockingDeque[Runnable](),
105-
Utils.namedThreadFactory("handle-message-executor")) {
105+
ThreadUtils.namedThreadFactory("handle-message-executor")) {
106106

107107
override def afterExecute(r: Runnable, t: Throwable): Unit = {
108108
super.afterExecute(r, t)
@@ -117,7 +117,7 @@ private[nio] class ConnectionManager(
117117
ioThreadCount,
118118
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
119119
new LinkedBlockingDeque[Runnable](),
120-
Utils.namedThreadFactory("handle-read-write-executor")) {
120+
ThreadUtils.namedThreadFactory("handle-read-write-executor")) {
121121

122122
override def afterExecute(r: Runnable, t: Throwable): Unit = {
123123
super.afterExecute(r, t)
@@ -134,7 +134,7 @@ private[nio] class ConnectionManager(
134134
connectThreadCount,
135135
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
136136
new LinkedBlockingDeque[Runnable](),
137-
Utils.namedThreadFactory("handle-connect-executor")) {
137+
ThreadUtils.namedThreadFactory("handle-connect-executor")) {
138138

139139
override def afterExecute(r: Runnable, t: Throwable): Unit = {
140140
super.afterExecute(r, t)
@@ -160,7 +160,7 @@ private[nio] class ConnectionManager(
160160
private val registerRequests = new SynchronizedQueue[SendingConnection]
161161

162162
implicit val futureExecContext = ExecutionContext.fromExecutor(
163-
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
163+
ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))
164164

165165
@volatile
166166
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.io.NotSerializableException
2121
import java.util.Properties
22-
import java.util.concurrent.{TimeUnit, Executors}
22+
import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
@@ -129,7 +129,7 @@ class DAGScheduler(
129129
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
130130

131131
private val messageScheduler =
132-
Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message"))
132+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
133133

134134
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
135135
taskScheduler.setDAGScheduler(this)

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
2626
import org.apache.spark._
2727
import org.apache.spark.TaskState.TaskState
2828
import org.apache.spark.serializer.SerializerInstance
29-
import org.apache.spark.util.Utils
29+
import org.apache.spark.util.{ThreadUtils, Utils}
3030

3131
/**
3232
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
@@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
3535
extends Logging {
3636

3737
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
38-
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
38+
private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
3939
THREADS, "task-result-getter")
4040

4141
protected val serializer = new ThreadLocal[SerializerInstance] {

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import java.util.concurrent.{TimeUnit, Executors}
20+
import java.util.concurrent.TimeUnit
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -26,7 +26,7 @@ import org.apache.spark.rpc._
2626
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
2727
import org.apache.spark.scheduler._
2828
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
29-
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
29+
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
3030

3131
/**
3232
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
7373
private val addressToExecutorId = new HashMap[RpcAddress, String]
7474

7575
private val reviveThread =
76-
Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread"))
76+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
7777

7878
override def onStart() {
7979
// Periodically revive offers to allow delay scheduling to work

core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
2424
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2525
import org.apache.spark.scheduler.TaskSchedulerImpl
2626
import org.apache.spark.ui.JettyUtils
27-
import org.apache.spark.util.{RpcUtils, Utils}
27+
import org.apache.spark.util.{ThreadUtils, RpcUtils}
2828

2929
import scala.util.control.NonFatal
3030

@@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
9797
private var amEndpoint: Option[RpcEndpointRef] = None
9898

9999
private val askAmThreadPool =
100-
Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
100+
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
101101
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
102102

103103
override def receive: PartialFunction[Any, Unit] = {

0 commit comments

Comments
 (0)