@@ -56,9 +56,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
56
56
import org .apache .spark .util .Utils
57
57
58
58
private [spark] class Client (
59
- val args : ClientArguments ,
60
- val hadoopConf : Configuration ,
61
- val sparkConf : SparkConf )
59
+ val args : ClientArguments ,
60
+ val hadoopConf : Configuration ,
61
+ val sparkConf : SparkConf )
62
62
extends Logging {
63
63
64
64
import Client ._
@@ -122,8 +122,8 @@ private[spark] class Client(
122
122
* This uses the YarnClientApplication not available in the Yarn alpha API.
123
123
*/
124
124
def createApplicationSubmissionContext (
125
- newApp : YarnClientApplication ,
126
- containerContext : ContainerLaunchContext ): ApplicationSubmissionContext = {
125
+ newApp : YarnClientApplication ,
126
+ containerContext : ContainerLaunchContext ): ApplicationSubmissionContext = {
127
127
val appContext = newApp.getApplicationSubmissionContext
128
128
appContext.setApplicationName(args.appName)
129
129
appContext.setQueue(args.amQueue)
@@ -190,9 +190,9 @@ private[spark] class Client(
190
190
* for preparing resources for launching the ApplicationMaster container. Exposed for testing.
191
191
*/
192
192
private [yarn] def copyFileToRemote (
193
- destDir : Path ,
194
- srcPath : Path ,
195
- replication : Short ): Path = {
193
+ destDir : Path ,
194
+ srcPath : Path ,
195
+ replication : Short ): Path = {
196
196
val destFs = destDir.getFileSystem(hadoopConf)
197
197
val srcFs = srcPath.getFileSystem(hadoopConf)
198
198
var destPath = srcPath
@@ -462,7 +462,7 @@ private[spark] class Client(
462
462
463
463
// Keep this for backwards compatibility but users should move to the config
464
464
sys.env.get(" SPARK_YARN_USER_ENV" ).foreach { userEnvs =>
465
- // Allow users to specify some environment variables.
465
+ // Allow users to specify some environment variables.
466
466
YarnSparkHadoopUtil .setEnvFromInputString(env, userEnvs)
467
467
// Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
468
468
env(" SPARK_YARN_USER_ENV" ) = userEnvs
@@ -522,7 +522,7 @@ private[spark] class Client(
522
522
* This sets up the launch environment, java options, and the command for launching the AM.
523
523
*/
524
524
private def createContainerLaunchContext (newAppResponse : GetNewApplicationResponse )
525
- : ContainerLaunchContext = {
525
+ : ContainerLaunchContext = {
526
526
logInfo(" Setting up container launch context for our AM" )
527
527
val appId = newAppResponse.getApplicationId
528
528
val appStagingDir = getAppStagingDir(appId)
@@ -661,14 +661,14 @@ private[spark] class Client(
661
661
val amArgs =
662
662
Seq (amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++
663
663
userArgs ++ Seq (
664
- " --executor-memory" , args.executorMemory.toString + " m" ,
665
- " --executor-cores" , args.executorCores.toString,
666
- " --num-executors " , args.numExecutors.toString)
664
+ " --executor-memory" , args.executorMemory.toString + " m" ,
665
+ " --executor-cores" , args.executorCores.toString,
666
+ " --num-executors " , args.numExecutors.toString)
667
667
668
668
// Command for the ApplicationMaster
669
669
val commands = prefixEnv ++ Seq (
670
- YarnSparkHadoopUtil .expandEnvironment(Environment .JAVA_HOME ) + " /bin/java" , " -server"
671
- ) ++
670
+ YarnSparkHadoopUtil .expandEnvironment(Environment .JAVA_HOME ) + " /bin/java" , " -server"
671
+ ) ++
672
672
javaOpts ++ amArgs ++
673
673
Seq (
674
674
" 1>" , ApplicationConstants .LOG_DIR_EXPANSION_VAR + " /stdout" ,
@@ -728,9 +728,9 @@ private[spark] class Client(
728
728
* @return A pair of the yarn application state and the final application state.
729
729
*/
730
730
def monitorApplication (
731
- appId : ApplicationId ,
732
- returnOnRunning : Boolean = false ,
733
- logApplicationReport : Boolean = true ): (YarnApplicationState , FinalApplicationStatus ) = {
731
+ appId : ApplicationId ,
732
+ returnOnRunning : Boolean = false ,
733
+ logApplicationReport : Boolean = true ): (YarnApplicationState , FinalApplicationStatus ) = {
734
734
val interval = sparkConf.getLong(" spark.yarn.report.interval" , 1000 )
735
735
var lastState : YarnApplicationState = null
736
736
while (true ) {
@@ -1085,7 +1085,7 @@ object Client extends Logging {
1085
1085
val hiveConf = hiveClass.getMethod(" getConf" ).invoke(hive)
1086
1086
val hiveConfClass = mirror.classLoader.loadClass(" org.apache.hadoop.hive.conf.HiveConf" )
1087
1087
1088
- val hiveConfGet = (param: String ) => Option (hiveConfClass
1088
+ val hiveConfGet = (param : String ) => Option (hiveConfClass
1089
1089
.getMethod(" get" , classOf [java.lang.String ])
1090
1090
.invoke(hiveConf, param))
1091
1091
@@ -1107,7 +1107,7 @@ object Client extends Logging {
1107
1107
1108
1108
val hive2Token = new Token [DelegationTokenIdentifier ]()
1109
1109
hive2Token.decodeFromUrlString(tokenStr)
1110
- credentials.addToken(new Text (" hive.server2.delegation.token" ),hive2Token)
1110
+ credentials.addToken(new Text (" hive.server2.delegation.token" ), hive2Token)
1111
1111
logDebug(" Added hive.Server2.delegation.token to conf." )
1112
1112
hiveClass.getMethod(" closeCurrent" ).invoke(null )
1113
1113
} else {
@@ -1152,13 +1152,13 @@ object Client extends Logging {
1152
1152
1153
1153
logInfo(" Added HBase security token to credentials." )
1154
1154
} catch {
1155
- case e: java.lang.NoSuchMethodException =>
1155
+ case e : java.lang.NoSuchMethodException =>
1156
1156
logInfo(" HBase Method not found: " + e)
1157
- case e: java.lang.ClassNotFoundException =>
1157
+ case e : java.lang.ClassNotFoundException =>
1158
1158
logDebug(" HBase Class not found: " + e)
1159
- case e: java.lang.NoClassDefFoundError =>
1159
+ case e : java.lang.NoClassDefFoundError =>
1160
1160
logDebug(" HBase Class not found: " + e)
1161
- case e: Exception =>
1161
+ case e : Exception =>
1162
1162
logError(" Exception when obtaining HBase security token: " + e)
1163
1163
}
1164
1164
}
0 commit comments