@@ -24,26 +24,48 @@ import org.apache.spark.internal.Logging
24
24
25
25
object HDFSConfig extends Logging {
26
26
27
- private def downloadFile (url : String , fileToDownload : String , outputhPath : String ) {
27
+ private def downloadFile (url : String ,
28
+ fileToDownload : String ,
29
+ outputhPath : String ,
30
+ connectTimeout : Int ,
31
+ readTimeout : Int ) {
28
32
logInfo(s " extracting $url/ $fileToDownload" )
29
33
new File (outputhPath).mkdirs
30
- val src = scala.io. Source .fromURL (s " $url/ $fileToDownload" )
34
+ val src = get (s " $url/ $fileToDownload" , connectTimeout, readTimeout).mkString( " " )
31
35
val downloadFile = Files .createFile(Paths .get(s " $outputhPath/ $fileToDownload" ),
32
36
PosixFilePermissions .asFileAttribute(PosixFilePermissions .fromString(" rw-------" )))
33
37
downloadFile.toFile.deleteOnExit() // just to be sure
34
- Files .write(downloadFile, src.mkString( " " ). getBytes)
38
+ Files .write(downloadFile, src.getBytes)
35
39
}
36
40
41
+ private def get (url : String ,
42
+ connectTimeout : Int ,
43
+ readTimeout : Int ): String = {
44
+ import java .net .{URL , HttpURLConnection }
45
+ val requestMethod = " GET"
46
+ val connection = (new URL (url)).openConnection.asInstanceOf [HttpURLConnection ]
47
+ connection.setConnectTimeout(connectTimeout)
48
+ connection.setReadTimeout(readTimeout)
49
+ connection.setRequestMethod(requestMethod)
50
+ val inputStream = connection.getInputStream
51
+ val content = scala.io.Source .fromInputStream(inputStream).mkString(" " )
52
+ if (inputStream != null ) inputStream.close
53
+ content
54
+ }
55
+
56
+
37
57
def prepareEnviroment (options : Map [String , String ]): Map [String , String ] = {
38
58
require(options.get(" HDFS_CONF_URI" ).isDefined,
39
59
" a proper HDFS URI must be configured to get Hadoop Configuration" )
40
60
require(sys.env.get(" HADOOP_CONF_DIR" ).isDefined,
41
61
" a proper Hadoop Conf Dir must be configured to store Hadoop Configuration" )
62
+ val connectTimeout : Int = options.get(" HDFS_CONF_CONNECT_TIMEOUT" ).getOrElse(" 5000" ).toInt
63
+ val readTimeout : Int = options.get(" HDFS_CONF_READ_TIMEOUT" ).getOrElse(" 5000" ).toInt
42
64
val hadoopConfUri = options.get(" HDFS_CONF_URI" ).get
43
65
val hadoopConfDir = sys.env.get(" HADOOP_CONF_DIR" ).get
44
- downloadFile(hadoopConfUri, " core-site.xml" , hadoopConfDir)
45
- downloadFile(hadoopConfUri, " hdfs-site.xml" , hadoopConfDir)
46
- downloadFile(hadoopConfUri, " krb5.conf" , " /etc/" )
66
+ downloadFile(hadoopConfUri, " core-site.xml" , hadoopConfDir, connectTimeout, readTimeout )
67
+ downloadFile(hadoopConfUri, " hdfs-site.xml" , hadoopConfDir, connectTimeout, readTimeout )
68
+ downloadFile(hadoopConfUri, " krb5.conf" , " /etc/" , connectTimeout, readTimeout )
47
69
Map .empty[String , String ]
48
70
}
49
- }
71
+ }
0 commit comments