Skip to content

Commit a3eb717

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-5811
2 parents c60156d + c06e42f commit a3eb717

File tree

180 files changed

+4563
-1916
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

180 files changed

+4563
-1916
lines changed

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@
114114
<exclude>META-INF/*.RSA</exclude>
115115
</excludes>
116116
</filter>
117+
<filter>
118+
<!-- Exclude libgfortran, libgcc for license issues -->
119+
<artifact>org.jblas:jblas</artifact>
120+
<excludes>
121+
<!-- Linux amd64 is OK; not statically linked -->
122+
<exclude>lib/Linux/i386/**</exclude>
123+
<exclude>lib/Mac OS X/**</exclude>
124+
<exclude>lib/Windows/**</exclude>
125+
</excludes>
126+
</filter>
117127
</filters>
118128
</configuration>
119129
<executions>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13631363
cleaner.foreach(_.stop())
13641364
dagScheduler.stop()
13651365
dagScheduler = null
1366+
progressBar.foreach(_.stop())
13661367
taskScheduler = null
13671368
// TODO: Cache.stop()?
13681369
env.stop()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import java.io.DataOutputStream
21+
import java.net.Socket
22+
23+
import py4j.GatewayServer
24+
25+
import org.apache.spark.Logging
26+
import org.apache.spark.util.Utils
27+
28+
/**
29+
* Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port
30+
* back to its caller via a callback port specified by the caller.
31+
*
32+
* This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
33+
*/
34+
private[spark] object PythonGatewayServer extends Logging {
35+
def main(args: Array[String]): Unit = Utils.tryOrExit {
36+
// Start a GatewayServer on an ephemeral port
37+
val gatewayServer: GatewayServer = new GatewayServer(null, 0)
38+
gatewayServer.start()
39+
val boundPort: Int = gatewayServer.getListeningPort
40+
if (boundPort == -1) {
41+
logError("GatewayServer failed to bind; exiting")
42+
System.exit(1)
43+
} else {
44+
logDebug(s"Started PythonGatewayServer on port $boundPort")
45+
}
46+
47+
// Communicate the bound port back to the caller via the caller-specified callback port
48+
val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST")
49+
val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt
50+
logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort")
51+
val callbackSocket = new Socket(callbackHost, callbackPort)
52+
val dos = new DataOutputStream(callbackSocket.getOutputStream)
53+
dos.writeInt(boundPort)
54+
dos.close()
55+
callbackSocket.close()
56+
57+
// Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies:
58+
while (System.in.read() != -1) {
59+
// Do nothing
60+
}
61+
logDebug("Exiting due to broken pipe from Python driver")
62+
System.exit(0)
63+
}
64+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
144144
stream.readFully(update)
145145
accumulator += Collections.singletonList(update)
146146
}
147+
147148
// Check whether the worker is ready to be re-used.
148-
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
149-
if (reuse_worker) {
150-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
151-
released = true
149+
if (reuse_worker) {
150+
// It has a high possibility that the ending mark is already available,
151+
// And current task should not be blocked by checking it
152+
153+
if (stream.available() >= 4) {
154+
val ending = stream.readInt()
155+
if (ending == SpecialLengths.END_OF_STREAM) {
156+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
157+
released = true
158+
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
159+
} else {
160+
logInfo(s"Communication with worker did not end cleanly " +
161+
s"(ending with $ending), close it: $worker")
162+
}
163+
} else {
164+
logInfo(s"The ending mark from worker is not available, close it: $worker")
152165
}
153166
}
154167
null
@@ -248,13 +261,13 @@ private[spark] class PythonRDD(
248261
} catch {
249262
case e: Exception if context.isCompleted || context.isInterrupted =>
250263
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
251-
worker.shutdownOutput()
264+
Utils.tryLog(worker.shutdownOutput())
252265

253266
case e: Exception =>
254267
// We must avoid throwing exceptions here, because the thread uncaught exception handler
255268
// will kill the whole executor (see org.apache.spark.executor.Executor).
256269
_exception = e
257-
worker.shutdownOutput()
270+
Utils.tryLog(worker.shutdownOutput())
258271
} finally {
259272
// Release memory used by this thread for shuffles
260273
env.shuffleMemoryManager.releaseMemoryForThisThread()

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3939

4040
import org.apache.spark.SPARK_VERSION
4141
import org.apache.spark.deploy.rest._
42-
import org.apache.spark.executor._
4342
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4443

4544
/**
@@ -302,8 +301,7 @@ object SparkSubmit {
302301
// If we're running a python app, set the main class to our specific python runner
303302
if (args.isPython && deployMode == CLIENT) {
304303
if (args.primaryResource == PYSPARK_SHELL) {
305-
args.mainClass = "py4j.GatewayServer"
306-
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
304+
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
307305
} else {
308306
// If a python file is provided, add it to the child arguments and list of files to deploy.
309307
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import scala.util.Try
21-
2220
import com.fasterxml.jackson.annotation._
2321
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
2422
import com.fasterxml.jackson.annotation.JsonInclude.Include
@@ -111,12 +109,14 @@ private[spark] object SubmitRestProtocolMessage {
111109
* If the action field is not found, throw a [[SubmitRestMissingFieldException]].
112110
*/
113111
def parseAction(json: String): String = {
114-
parse(json).asInstanceOf[JObject].obj
115-
.find { case (f, _) => f == "action" }
116-
.map { case (_, v) => v.asInstanceOf[JString].s }
117-
.getOrElse {
118-
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119-
}
112+
val value: Option[String] = parse(json) match {
113+
case JObject(fields) =>
114+
fields.collectFirst { case ("action", v) => v }.collect { case JString(s) => s }
115+
case _ => None
116+
}
117+
value.getOrElse {
118+
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
119+
}
120120
}
121121

122122
/**

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
4949
}
5050
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
5151

52-
addShutdownHook()
52+
private val shutdownHook = addShutdownHook()
5353

5454
/** Looks up a file by hashing it into one of our local subdirectories. */
5555
// This method should be kept in sync with
@@ -134,17 +134,22 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
134134
}
135135
}
136136

137-
private def addShutdownHook() {
138-
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
137+
private def addShutdownHook(): Thread = {
138+
val shutdownHook = new Thread("delete Spark local dirs") {
139139
override def run(): Unit = Utils.logUncaughtExceptions {
140140
logDebug("Shutdown hook called")
141141
DiskBlockManager.this.stop()
142142
}
143-
})
143+
}
144+
Runtime.getRuntime.addShutdownHook(shutdownHook)
145+
shutdownHook
144146
}
145147

146148
/** Cleanup local dirs and stop shuffle sender. */
147149
private[spark] def stop() {
150+
// Remove the shutdown hook. It causes memory leaks if we leave it around.
151+
Runtime.getRuntime.removeShutdownHook(shutdownHook)
152+
148153
// Only perform cleanup if an external service is not serving our shuffle files.
149154
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
150155
localDirs.foreach { localDir =>

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark._
2828
* of them will be combined together, showed in one line.
2929
*/
3030
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
31-
3231
// Carrige return
3332
val CR = '\r'
3433
// Update period of progress bar, in milliseconds
@@ -121,4 +120,10 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
121120
clear()
122121
lastFinishTime = System.currentTimeMillis()
123122
}
123+
124+
/**
125+
* Tear down the timer thread. The timer thread is a GC root, and it retains the entire
126+
* SparkContext if it's not terminated.
127+
*/
128+
def stop(): Unit = timer.cancel()
124129
}

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
245245
val goodJson = constructSubmitRequest(masterUrl).toJson
246246
val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON
247247
val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON
248+
val notJson = "\"hello, world\""
248249
val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON
249250
val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1)
250251
val (response3, code3) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson2)
251252
val (response4, code4) = sendHttpRequestWithResponse(killRequestPath, "POST") // missing ID
252253
val (response5, code5) = sendHttpRequestWithResponse(s"$killRequestPath/", "POST")
253254
val (response6, code6) = sendHttpRequestWithResponse(statusRequestPath, "GET") // missing ID
254255
val (response7, code7) = sendHttpRequestWithResponse(s"$statusRequestPath/", "GET")
256+
val (response8, code8) = sendHttpRequestWithResponse(submitRequestPath, "POST", notJson)
255257
// these should all fail as error responses
256258
getErrorResponse(response1)
257259
getErrorResponse(response2)
@@ -260,13 +262,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
260262
getErrorResponse(response5)
261263
getErrorResponse(response6)
262264
getErrorResponse(response7)
265+
getErrorResponse(response8)
263266
assert(code1 === HttpServletResponse.SC_BAD_REQUEST)
264267
assert(code2 === HttpServletResponse.SC_BAD_REQUEST)
265268
assert(code3 === HttpServletResponse.SC_BAD_REQUEST)
266269
assert(code4 === HttpServletResponse.SC_BAD_REQUEST)
267270
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
268271
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
269272
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
273+
assert(code8 === HttpServletResponse.SC_BAD_REQUEST)
270274
}
271275

272276
test("bad request paths") {

core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.scalatest.FunSuite
2828

2929
import org.apache.hadoop.io.Text
3030

31-
import org.apache.spark.SparkContext
31+
import org.apache.spark.{SparkConf, SparkContext}
3232
import org.apache.spark.util.Utils
3333
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
3434

@@ -42,7 +42,15 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
4242
private var factory: CompressionCodecFactory = _
4343

4444
override def beforeAll() {
45-
sc = new SparkContext("local", "test")
45+
// Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
46+
// can cause Filesystem.get(Configuration) to return a cached instance created with a different
47+
// configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
48+
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
49+
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
50+
// we disable FileSystem caching in this suite.
51+
val conf = new SparkConf().set("spark.hadoop.fs.file.impl.disable.cache", "true")
52+
53+
sc = new SparkContext("local", "test", conf)
4654

4755
// Set the block size of local file system to test whether files are split right or not.
4856
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)

0 commit comments

Comments
 (0)