Skip to content

Commit 2b50502

Browse files
committed
rebase
2 parents cb22863 + a72c0d4 commit 2b50502

File tree

134 files changed

+3344
-1867
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

134 files changed

+3344
-1867
lines changed

bin/compute-classpath.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3838
rem Build up classpath
3939
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
4040

41-
if "x%SPARK_CONF_DIR%"!="x" (
41+
if not "x%SPARK_CONF_DIR%"=="x" (
4242
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
4343
) else (
4444
set CLASSPATH=%CLASSPATH%;%FWDIR%conf

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ table.sortable thead {
5151
cursor: pointer;
5252
}
5353

54+
table.sortable td {
55+
word-wrap: break-word;
56+
max-width: 600px;
57+
}
58+
5459
.progress {
5560
margin-bottom: 0px; position: relative
5661
}

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
168168
arr.iterator.asInstanceOf[Iterator[T]]
169169
case Right(it) =>
170170
// There is not enough space to cache this partition in memory
171-
logWarning(s"Not enough space to cache partition $key in memory! " +
172-
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
173171
val returnValues = it.asInstanceOf[Iterator[T]]
174172
if (putLevel.useDisk) {
175173
logWarning(s"Persisting partition $key to disk instead.")

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -779,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
779779
/**
780780
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
781781
* with `+=`. Only the driver can access the accumuable's `value`.
782-
* @tparam T accumulator type
783-
* @tparam R type that can be added to the accumulator
782+
* @tparam R accumulator result type
783+
* @tparam T type that can be added to the accumulator
784784
*/
785-
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
785+
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
786786
new Accumulable(initialValue, param)
787787

788788
/**
789789
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
790790
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
791791
* access the accumuable's `value`.
792-
* @tparam T accumulator type
793-
* @tparam R type that can be added to the accumulator
792+
* @tparam R accumulator result type
793+
* @tparam T type that can be added to the accumulator
794794
*/
795-
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
795+
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
796796
new Accumulable(initialValue, param, Some(name))
797797

798798
/**

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
4343
* :: DeveloperApi ::
4444
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
4545
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
46-
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
47-
* objects needs to have the right SparkEnv set. You can get the current environment with
48-
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
46+
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
47+
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
4948
*
5049
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
5150
* in a future release.
@@ -119,30 +118,28 @@ class SparkEnv (
119118
}
120119

121120
object SparkEnv extends Logging {
122-
private val env = new ThreadLocal[SparkEnv]
123-
@volatile private var lastSetSparkEnv : SparkEnv = _
121+
@volatile private var env: SparkEnv = _
124122

125123
private[spark] val driverActorSystemName = "sparkDriver"
126124
private[spark] val executorActorSystemName = "sparkExecutor"
127125

128126
def set(e: SparkEnv) {
129-
lastSetSparkEnv = e
130-
env.set(e)
127+
env = e
131128
}
132129

133130
/**
134-
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
135-
* previously set in any thread.
131+
* Returns the SparkEnv.
136132
*/
137133
def get: SparkEnv = {
138-
Option(env.get()).getOrElse(lastSetSparkEnv)
134+
env
139135
}
140136

141137
/**
142138
* Returns the ThreadLocal SparkEnv.
143139
*/
140+
@deprecated("Use SparkEnv.get instead", "1.2")
144141
def getThreadLocal: SparkEnv = {
145-
env.get()
142+
env
146143
}
147144

148145
private[spark] def create(

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ private[spark] class PythonRDD(
196196

197197
override def run(): Unit = Utils.logUncaughtExceptions {
198198
try {
199-
SparkEnv.set(env)
200199
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
201200
val dataOut = new DataOutputStream(stream)
202201
// Partition index
@@ -248,6 +247,11 @@ private[spark] class PythonRDD(
248247
// will kill the whole executor (see org.apache.spark.executor.Executor).
249248
_exception = e
250249
worker.shutdownOutput()
250+
} finally {
251+
// Release memory used by this thread for shuffles
252+
env.shuffleMemoryManager.releaseMemoryForThisThread()
253+
// Release memory used by this thread for unrolling blocks
254+
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
251255
}
252256
}
253257
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {
163163

164164
private def write(id: Long, value: Any) {
165165
val file = getFile(id)
166-
val out: OutputStream = {
167-
if (compress) {
168-
compressionCodec.compressedOutputStream(new FileOutputStream(file))
169-
} else {
170-
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
166+
val fileOutputStream = new FileOutputStream(file)
167+
try {
168+
val out: OutputStream = {
169+
if (compress) {
170+
compressionCodec.compressedOutputStream(fileOutputStream)
171+
} else {
172+
new BufferedOutputStream(fileOutputStream, bufferSize)
173+
}
171174
}
175+
val ser = SparkEnv.get.serializer.newInstance()
176+
val serOut = ser.serializeStream(out)
177+
serOut.writeObject(value)
178+
serOut.close()
179+
files += file
180+
} finally {
181+
fileOutputStream.close()
172182
}
173-
val ser = SparkEnv.get.serializer.newInstance()
174-
val serOut = ser.serializeStream(out)
175-
serOut.writeObject(value)
176-
serOut.close()
177-
files += file
178183
}
179184

180185
private def read[T: ClassTag](id: Long): T = {

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
130130
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
131131
System.exit(-1)
132132

133-
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
133+
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
134134
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
135135
println(s"Cause was: $cause")
136136
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[spark] class AppClient(
154154
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
155155
markDisconnected()
156156

157-
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
157+
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
158158
logWarning(s"Could not connect to $address: $cause")
159159

160160
case StopAppClient =>

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
8383
val serialized = serializer.toBinary(value)
8484

8585
val out = new FileOutputStream(file)
86-
out.write(serialized)
87-
out.close()
86+
try {
87+
out.write(serialized)
88+
} finally {
89+
out.close()
90+
}
8891
}
8992

9093
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
9194
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
9295
val dis = new DataInputStream(new FileInputStream(file))
93-
dis.readFully(fileData)
94-
dis.close()
96+
try {
97+
dis.readFully(fileData)
98+
} finally {
99+
dis.close()
100+
}
95101

96102
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
97103
val serializer = serialization.serializerFor(clazz)

0 commit comments

Comments
 (0)