Skip to content

Commit 2bbac9c

Browse files
committed
Merge remote-tracking branch 'origin/master' into sql-external-sort
2 parents 35dad9f + 11e22b7 commit 2bbac9c

File tree

194 files changed

+706
-3479
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+706
-3479
lines changed

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ trait Logging {
121121
if (usingLog4j12) {
122122
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
123123
if (!log4j12Initialized) {
124+
// scalastyle:off println
124125
if (Utils.isInInterpreter) {
125126
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
126127
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
@@ -141,6 +142,7 @@ trait Logging {
141142
System.err.println(s"Spark was unable to load $defaultLogProps")
142143
}
143144
}
145+
// scalastyle:on println
144146
}
145147
}
146148
Logging.initialized = true

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ private[spark] class RBackend {
9595
private[spark] object RBackend extends Logging {
9696
def main(args: Array[String]): Unit = {
9797
if (args.length < 1) {
98+
// scalastyle:off println
9899
System.err.println("Usage: RBackend <tempFilePath>")
100+
// scalastyle:on println
99101
System.exit(-1)
100102
}
101103
val sparkRBackend = new RBackend()

core/src/main/scala/org/apache/spark/api/r/RRDD.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
161161
dataOut.write(elem.asInstanceOf[Array[Byte]])
162162
} else if (deserializer == SerializationFormats.STRING) {
163163
// write string(for StringRRDD)
164+
// scalastyle:off println
164165
printOut.println(elem)
166+
// scalastyle:on println
165167
}
166168
}
167169

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,26 +118,26 @@ private class ClientEndpoint(
118118
def pollAndReportStatus(driverId: String) {
119119
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
120120
// is fine.
121-
println("... waiting before polling master for driver state")
121+
logInfo("... waiting before polling master for driver state")
122122
Thread.sleep(5000)
123-
println("... polling master for driver state")
123+
logInfo("... polling master for driver state")
124124
val statusResponse =
125125
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
126126
statusResponse.found match {
127127
case false =>
128-
println(s"ERROR: Cluster master did not recognize $driverId")
128+
logError(s"ERROR: Cluster master did not recognize $driverId")
129129
System.exit(-1)
130130
case true =>
131-
println(s"State of $driverId is ${statusResponse.state.get}")
131+
logInfo(s"State of $driverId is ${statusResponse.state.get}")
132132
// Worker node, if present
133133
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
134134
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
135-
println(s"Driver running on $hostPort ($id)")
135+
logInfo(s"Driver running on $hostPort ($id)")
136136
case _ =>
137137
}
138138
// Exception, if present
139139
statusResponse.exception.map { e =>
140-
println(s"Exception from cluster was: $e")
140+
logError(s"Exception from cluster was: $e")
141141
e.printStackTrace()
142142
System.exit(-1)
143143
}
@@ -148,7 +148,7 @@ private class ClientEndpoint(
148148
override def receive: PartialFunction[Any, Unit] = {
149149

150150
case SubmitDriverResponse(master, success, driverId, message) =>
151-
println(message)
151+
logInfo(message)
152152
if (success) {
153153
activeMasterEndpoint = master
154154
pollAndReportStatus(driverId.get)
@@ -158,7 +158,7 @@ private class ClientEndpoint(
158158

159159

160160
case KillDriverResponse(master, driverId, success, message) =>
161-
println(message)
161+
logInfo(message)
162162
if (success) {
163163
activeMasterEndpoint = master
164164
pollAndReportStatus(driverId)
@@ -169,32 +169,32 @@ private class ClientEndpoint(
169169

170170
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
171171
if (!lostMasters.contains(remoteAddress)) {
172-
println(s"Error connecting to master $remoteAddress.")
172+
logError(s"Error connecting to master $remoteAddress.")
173173
lostMasters += remoteAddress
174174
// Note that this heuristic does not account for the fact that a Master can recover within
175175
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
176176
// is not currently a concern, however, because this client does not retry submissions.
177177
if (lostMasters.size >= masterEndpoints.size) {
178-
println("No master is available, exiting.")
178+
logError("No master is available, exiting.")
179179
System.exit(-1)
180180
}
181181
}
182182
}
183183

184184
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
185185
if (!lostMasters.contains(remoteAddress)) {
186-
println(s"Error connecting to master ($remoteAddress).")
187-
println(s"Cause was: $cause")
186+
logError(s"Error connecting to master ($remoteAddress).")
187+
logError(s"Cause was: $cause")
188188
lostMasters += remoteAddress
189189
if (lostMasters.size >= masterEndpoints.size) {
190-
println("No master is available, exiting.")
190+
logError("No master is available, exiting.")
191191
System.exit(-1)
192192
}
193193
}
194194
}
195195

196196
override def onError(cause: Throwable): Unit = {
197-
println(s"Error processing messages, exiting.")
197+
logError(s"Error processing messages, exiting.")
198198
cause.printStackTrace()
199199
System.exit(-1)
200200
}
@@ -209,10 +209,12 @@ private class ClientEndpoint(
209209
*/
210210
object Client {
211211
def main(args: Array[String]) {
212+
// scalastyle:off println
212213
if (!sys.props.contains("SPARK_SUBMIT")) {
213214
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
214215
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
215216
}
217+
// scalastyle:on println
216218

217219
val conf = new SparkConf()
218220
val driverArgs = new ClientArguments(args)

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ private[deploy] class ClientArguments(args: Array[String]) {
7272
cmd = "launch"
7373

7474
if (!ClientArguments.isValidJarUrl(_jarUrl)) {
75+
// scalastyle:off println
7576
println(s"Jar url '${_jarUrl}' is not in valid format.")
7677
println(s"Must be a jar file path in URL format " +
7778
"(e.g. hdfs://host:port/XX.jar, file:///XX.jar)")
79+
// scalastyle:on println
7880
printUsageAndExit(-1)
7981
}
8082

@@ -110,7 +112,9 @@ private[deploy] class ClientArguments(args: Array[String]) {
110112
| (default: $DEFAULT_SUPERVISE)
111113
| -v, --verbose Print more debugging output
112114
""".stripMargin
115+
// scalastyle:off println
113116
System.err.println(usage)
117+
// scalastyle:on println
114118
System.exit(exitCode)
115119
}
116120
}

core/src/main/scala/org/apache/spark/deploy/RRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ object RRunner {
8585
}
8686
System.exit(returnCode)
8787
} else {
88+
// scalastyle:off println
8889
System.err.println("SparkR backend did not initialize in " + backendTimeout + " seconds")
90+
// scalastyle:on println
8991
System.exit(-1)
9092
}
9193
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ object SparkSubmit {
8282

8383
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
8484

85+
// scalastyle:off println
8586
// Exposed for testing
8687
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
8788
private[spark] var printStream: PrintStream = System.err
@@ -102,11 +103,14 @@ object SparkSubmit {
102103
printStream.println("Type --help for more information.")
103104
exitFn(0)
104105
}
106+
// scalastyle:on println
105107

106108
def main(args: Array[String]): Unit = {
107109
val appArgs = new SparkSubmitArguments(args)
108110
if (appArgs.verbose) {
111+
// scalastyle:off println
109112
printStream.println(appArgs)
113+
// scalastyle:on println
110114
}
111115
appArgs.action match {
112116
case SparkSubmitAction.SUBMIT => submit(appArgs)
@@ -160,7 +164,9 @@ object SparkSubmit {
160164
// makes the message printed to the output by the JVM not very helpful. Instead,
161165
// detect exceptions with empty stack traces here, and treat them differently.
162166
if (e.getStackTrace().length == 0) {
167+
// scalastyle:off println
163168
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
169+
// scalastyle:on println
164170
exitFn(1)
165171
} else {
166172
throw e
@@ -178,7 +184,9 @@ object SparkSubmit {
178184
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
179185
if (args.isStandaloneCluster && args.useRest) {
180186
try {
187+
// scalastyle:off println
181188
printStream.println("Running Spark using the REST application submission protocol.")
189+
// scalastyle:on println
182190
doRunMain()
183191
} catch {
184192
// Fail over to use the legacy submission gateway
@@ -558,13 +566,15 @@ object SparkSubmit {
558566
sysProps: Map[String, String],
559567
childMainClass: String,
560568
verbose: Boolean): Unit = {
569+
// scalastyle:off println
561570
if (verbose) {
562571
printStream.println(s"Main class:\n$childMainClass")
563572
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
564573
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
565574
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
566575
printStream.println("\n")
567576
}
577+
// scalastyle:on println
568578

569579
val loader =
570580
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
@@ -592,8 +602,10 @@ object SparkSubmit {
592602
case e: ClassNotFoundException =>
593603
e.printStackTrace(printStream)
594604
if (childMainClass.contains("thriftserver")) {
605+
// scalastyle:off println
595606
printStream.println(s"Failed to load main class $childMainClass.")
596607
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
608+
// scalastyle:on println
597609
}
598610
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
599611
}
@@ -766,7 +778,9 @@ private[spark] object SparkSubmitUtils {
766778
brr.setRoot(repo)
767779
brr.setName(s"repo-${i + 1}")
768780
cr.add(brr)
781+
// scalastyle:off println
769782
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
783+
// scalastyle:on println
770784
}
771785
}
772786

@@ -829,7 +843,9 @@ private[spark] object SparkSubmitUtils {
829843
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
830844
val dd = new DefaultDependencyDescriptor(ri, false, false)
831845
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
846+
// scalastyle:off println
832847
printStream.println(s"${dd.getDependencyId} added as a dependency")
848+
// scalastyle:on println
833849
md.addDependency(dd)
834850
}
835851
}
@@ -896,9 +912,11 @@ private[spark] object SparkSubmitUtils {
896912
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
897913
new File(alternateIvyCache, "jars")
898914
}
915+
// scalastyle:off println
899916
printStream.println(
900917
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
901918
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
919+
// scalastyle:on println
902920
// create a pattern matcher
903921
ivySettings.addMatcher(new GlobPatternMatcher)
904922
// create the dependency resolvers

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
7979
/** Default properties present in the currently defined defaults file. */
8080
lazy val defaultSparkProperties: HashMap[String, String] = {
8181
val defaultProperties = new HashMap[String, String]()
82+
// scalastyle:off println
8283
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
8384
Option(propertiesFile).foreach { filename =>
8485
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
8586
defaultProperties(k) = v
8687
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
8788
}
8889
}
90+
// scalastyle:on println
8991
defaultProperties
9092
}
9193

@@ -452,6 +454,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
452454
}
453455

454456
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
457+
// scalastyle:off println
455458
val outStream = SparkSubmit.printStream
456459
if (unknownParam != null) {
457460
outStream.println("Unknown/unsupported param " + unknownParam)
@@ -541,6 +544,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
541544
outStream.println("CLI options:")
542545
outStream.println(getSqlShellOptions())
543546
}
547+
// scalastyle:on println
544548

545549
SparkSubmit.exitFn(exitCode)
546550
}

core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package org.apache.spark.deploy.client
1919

2020
private[spark] object TestExecutor {
2121
def main(args: Array[String]) {
22+
// scalastyle:off println
2223
println("Hello world!")
24+
// scalastyle:on println
2325
while (true) {
2426
Thread.sleep(1000)
2527
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
5656
Utils.loadDefaultSparkProperties(conf, propertiesFile)
5757

5858
private def printUsageAndExit(exitCode: Int) {
59+
// scalastyle:off println
5960
System.err.println(
6061
"""
6162
|Usage: HistoryServer [options]
@@ -84,6 +85,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
8485
| spark.history.fs.updateInterval How often to reload log data from storage
8586
| (in seconds, default: 10)
8687
|""".stripMargin)
88+
// scalastyle:on println
8789
System.exit(exitCode)
8890
}
8991

0 commit comments

Comments
 (0)