Skip to content

Commit f28aaeb

Browse files
Merge pull request apache#24 from sven0726/2.2-xspark
spark2.2.0 支持ak,sk 注入
2 parents f61120a + 7405577 commit f28aaeb

File tree

1 file changed

+32
-15
lines changed

1 file changed

+32
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,11 @@ class SparkHadoopUtil extends Logging {
8484
// the behavior of the old implementation of this code, for backwards compatibility.
8585
if (conf != null) {
8686
// Explicitly check for S3 environment variables
87-
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
88-
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
89-
if (keyId != null && accessKey != null) {
90-
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
91-
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
92-
hadoopConf.set("fs.s3a.access.key", keyId)
93-
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
94-
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
95-
hadoopConf.set("fs.s3a.secret.key", accessKey)
96-
97-
val sessionToken = System.getenv("AWS_SESSION_TOKEN")
98-
if (sessionToken != null) {
99-
hadoopConf.set("fs.s3a.session.token", sessionToken)
100-
}
101-
}
87+
initWithAWSConf(hadoopConf)
88+
89+
// Check for Qiniu AK SK
90+
initWithQiniuConf(hadoopConf)
91+
10292
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
10393
conf.getAll.foreach { case (key, value) =>
10494
if (key.startsWith("spark.hadoop.")) {
@@ -110,6 +100,33 @@ class SparkHadoopUtil extends Logging {
110100
}
111101
}
112102

103+
private def initWithAWSConf(hadoopConf: Configuration) = {
104+
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
105+
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
106+
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
107+
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
108+
109+
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
110+
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
111+
hadoopConf.set("fs.s3a.access.key", keyId)
112+
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
113+
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
114+
hadoopConf.set("fs.s3a.secret.key", accessKey)
115+
}
116+
}
117+
118+
private def initWithQiniuConf(hadoopConf: Configuration) = {
119+
if (System.getenv("QINIU_ACCESS_KEY") != null &&
120+
System.getenv("QINIU_SECRET_KEY") != null) {
121+
val accessKey = System.getenv("QINIU_ACCESS_KEY")
122+
val secretKey = System.getenv("QINIU_SECRET_KEY")
123+
124+
hadoopConf.set("fs.qiniu.access.key", accessKey)
125+
hadoopConf.set("fs.qiniu.secret.key", secretKey)
126+
logInfo("Success init conf with qiniu ak sk.")
127+
}
128+
}
129+
113130
/**
114131
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
115132
* subsystems.

0 commit comments

Comments
 (0)