Skip to content

Commit 043e786

Browse files
committed
Merge remote-tracking branch 'upstream/master' into ldaonline
s Conflicts: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
2 parents d640d9c + 6d3b7cb commit 043e786

File tree

63 files changed

+2110
-927
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2110
-927
lines changed

build/mvn

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ install_app() {
3434
local binary="${_DIR}/$3"
3535

3636
# setup `curl` and `wget` silent options if we're running on Jenkins
37-
local curl_opts=""
37+
local curl_opts="-L"
3838
local wget_opts=""
3939
if [ -n "$AMPLAB_JENKINS" ]; then
40-
curl_opts="-s"
41-
wget_opts="--quiet"
40+
curl_opts="-s ${curl_opts}"
41+
wget_opts="--quiet ${wget_opts}"
4242
else
43-
curl_opts="--progress-bar"
44-
wget_opts="--progress=bar:force"
43+
curl_opts="--progress-bar ${curl_opts}"
44+
wget_opts="--progress=bar:force ${wget_opts}"
4545
fi
4646

4747
if [ -z "$3" -o ! -f "$binary" ]; then

core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@
132132
<artifactId>jetty-servlet</artifactId>
133133
<scope>compile</scope>
134134
</dependency>
135+
<!-- Because we mark jetty as provided and shade it, its dependency
136+
orbit is ignored, so we explicitly list it here (see SPARK-5557).-->
137+
<dependency>
138+
<groupId>org.eclipse.jetty.orbit</groupId>
139+
<artifactId>javax.servlet</artifactId>
140+
<version>${orbit.version}</version>
141+
</dependency>
135142

136143
<dependency>
137144
<groupId>org.apache.commons</groupId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,37 @@ import java.net.URI
2525
import java.util.{Arrays, Properties, UUID}
2626
import java.util.concurrent.atomic.AtomicInteger
2727
import java.util.UUID.randomUUID
28+
2829
import scala.collection.{Map, Set}
2930
import scala.collection.JavaConversions._
3031
import scala.collection.generic.Growable
3132
import scala.collection.mutable.HashMap
3233
import scala.reflect.{ClassTag, classTag}
34+
35+
import akka.actor.Props
36+
3337
import org.apache.hadoop.conf.Configuration
3438
import org.apache.hadoop.fs.Path
35-
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
36-
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
39+
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
40+
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
41+
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
42+
TextInputFormat}
3743
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3844
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
45+
3946
import org.apache.mesos.MesosNativeLibrary
40-
import akka.actor.Props
4147

4248
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4349
import org.apache.spark.broadcast.Broadcast
4450
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4551
import org.apache.spark.executor.TriggerThreadDump
46-
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
52+
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
53+
FixedLengthBinaryInputFormat}
4754
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4855
import org.apache.spark.rdd._
4956
import org.apache.spark.scheduler._
50-
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
57+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
58+
SparkDeploySchedulerBackend, SimrSchedulerBackend}
5159
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
5260
import org.apache.spark.scheduler.local.LocalBackend
5361
import org.apache.spark.storage._
@@ -1016,12 +1024,48 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10161024
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
10171025
* use `SparkFiles.get(fileName)` to find its download location.
10181026
*/
1019-
def addFile(path: String) {
1027+
def addFile(path: String): Unit = {
1028+
addFile(path, false)
1029+
}
1030+
1031+
/**
1032+
* Add a file to be downloaded with this Spark job on every node.
1033+
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
1034+
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
1035+
* use `SparkFiles.get(fileName)` to find its download location.
1036+
*
1037+
* A directory can be given if the recursive option is set to true. Currently directories are only
1038+
* supported for Hadoop-supported filesystems.
1039+
*/
1040+
def addFile(path: String, recursive: Boolean): Unit = {
10201041
val uri = new URI(path)
1021-
val key = uri.getScheme match {
1022-
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
1023-
case "local" => "file:" + uri.getPath
1024-
case _ => path
1042+
val schemeCorrectedPath = uri.getScheme match {
1043+
case null | "local" => "file:" + uri.getPath
1044+
case _ => path
1045+
}
1046+
1047+
val hadoopPath = new Path(schemeCorrectedPath)
1048+
val scheme = new URI(schemeCorrectedPath).getScheme
1049+
if (!Array("http", "https", "ftp").contains(scheme)) {
1050+
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
1051+
if (!fs.exists(hadoopPath)) {
1052+
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
1053+
}
1054+
val isDir = fs.isDirectory(hadoopPath)
1055+
if (!isLocal && scheme == "file" && isDir) {
1056+
throw new SparkException(s"addFile does not support local directories when not running " +
1057+
"local mode.")
1058+
}
1059+
if (!recursive && isDir) {
1060+
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
1061+
"turned on.")
1062+
}
1063+
}
1064+
1065+
val key = if (!isLocal && scheme == "file") {
1066+
env.httpFileServer.addFile(new File(uri.getPath))
1067+
} else {
1068+
schemeCorrectedPath
10251069
}
10261070
val timestamp = System.currentTimeMillis
10271071
addedFiles(key) = timestamp
@@ -1633,8 +1677,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16331677
val schedulingMode = getSchedulingMode.toString
16341678
val addedJarPaths = addedJars.keys.toSeq
16351679
val addedFilePaths = addedFiles.keys.toSeq
1636-
val environmentDetails =
1637-
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
1680+
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
1681+
addedFilePaths)
16381682
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
16391683
listenerBus.post(environmentUpdate)
16401684
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.lang.reflect.Method
2121
import java.security.PrivilegedExceptionAction
2222

2323
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
2727
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
@@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
191191
val method = context.getClass.getMethod("getConfiguration")
192192
method.invoke(context).asInstanceOf[Configuration]
193193
}
194+
195+
/**
196+
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
197+
* given path points to a file, return a single-element collection containing [[FileStatus]] of
198+
* that file.
199+
*/
200+
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
201+
def recurse(path: Path) = {
202+
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
203+
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
204+
}
205+
206+
val baseStatus = fs.getFileStatus(basePath)
207+
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
208+
}
194209
}
195210

196211
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,10 @@ private[spark] object Utils extends Logging {
386386
}
387387

388388
/**
389-
* Download a file to target directory. Supports fetching the file in a variety of ways,
390-
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
389+
* Download a file or directory to target directory. Supports fetching the file in a variety of
390+
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
391+
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
392+
* filesystems.
391393
*
392394
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
393395
* across executors running the same application. `useCache` is used mainly for
@@ -456,17 +458,18 @@ private[spark] object Utils extends Logging {
456458
*
457459
* @param url URL that `sourceFile` originated from, for logging purposes.
458460
* @param in InputStream to download.
459-
* @param tempFile File path to download `in` to.
460461
* @param destFile File path to move `tempFile` to.
461462
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
462463
* `sourceFile`
463464
*/
464465
private def downloadFile(
465466
url: String,
466467
in: InputStream,
467-
tempFile: File,
468468
destFile: File,
469469
fileOverwrite: Boolean): Unit = {
470+
val tempFile = File.createTempFile("fetchFileTemp", null,
471+
new File(destFile.getParentFile.getAbsolutePath))
472+
logInfo(s"Fetching $url to $tempFile")
470473

471474
try {
472475
val out = new FileOutputStream(tempFile)
@@ -505,7 +508,7 @@ private[spark] object Utils extends Logging {
505508
removeSourceFile: Boolean = false): Unit = {
506509

507510
if (destFile.exists) {
508-
if (!Files.equal(sourceFile, destFile)) {
511+
if (!filesEqualRecursive(sourceFile, destFile)) {
509512
if (fileOverwrite) {
510513
logInfo(
511514
s"File $destFile exists and does not match contents of $url, replacing it with $url"
@@ -540,13 +543,44 @@ private[spark] object Utils extends Logging {
540543
Files.move(sourceFile, destFile)
541544
} else {
542545
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
543-
Files.copy(sourceFile, destFile)
546+
copyRecursive(sourceFile, destFile)
547+
}
548+
}
549+
550+
private def filesEqualRecursive(file1: File, file2: File): Boolean = {
551+
if (file1.isDirectory && file2.isDirectory) {
552+
val subfiles1 = file1.listFiles()
553+
val subfiles2 = file2.listFiles()
554+
if (subfiles1.size != subfiles2.size) {
555+
return false
556+
}
557+
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall {
558+
case (f1, f2) => filesEqualRecursive(f1, f2)
559+
}
560+
} else if (file1.isFile && file2.isFile) {
561+
Files.equal(file1, file2)
562+
} else {
563+
false
564+
}
565+
}
566+
567+
private def copyRecursive(source: File, dest: File): Unit = {
568+
if (source.isDirectory) {
569+
if (!dest.mkdir()) {
570+
throw new IOException(s"Failed to create directory ${dest.getPath}")
571+
}
572+
val subfiles = source.listFiles()
573+
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
574+
} else {
575+
Files.copy(source, dest)
544576
}
545577
}
546578

547579
/**
548-
* Download a file to target directory. Supports fetching the file in a variety of ways,
549-
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
580+
* Download a file or directory to target directory. Supports fetching the file in a variety of
581+
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
582+
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
583+
* filesystems.
550584
*
551585
* Throws SparkException if the target file already exists and has different contents than
552586
* the requested file.
@@ -558,14 +592,11 @@ private[spark] object Utils extends Logging {
558592
conf: SparkConf,
559593
securityMgr: SecurityManager,
560594
hadoopConf: Configuration) {
561-
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
562595
val targetFile = new File(targetDir, filename)
563596
val uri = new URI(url)
564597
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
565598
Option(uri.getScheme).getOrElse("file") match {
566599
case "http" | "https" | "ftp" =>
567-
logInfo("Fetching " + url + " to " + tempFile)
568-
569600
var uc: URLConnection = null
570601
if (securityMgr.isAuthenticationEnabled()) {
571602
logDebug("fetchFile with security enabled")
@@ -583,17 +614,44 @@ private[spark] object Utils extends Logging {
583614
uc.setReadTimeout(timeout)
584615
uc.connect()
585616
val in = uc.getInputStream()
586-
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
617+
downloadFile(url, in, targetFile, fileOverwrite)
587618
case "file" =>
588619
// In the case of a local file, copy the local file to the target directory.
589620
// Note the difference between uri vs url.
590621
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
591622
copyFile(url, sourceFile, targetFile, fileOverwrite)
592623
case _ =>
593-
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
594624
val fs = getHadoopFileSystem(uri, hadoopConf)
595-
val in = fs.open(new Path(uri))
596-
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
625+
val path = new Path(uri)
626+
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
627+
}
628+
}
629+
630+
/**
631+
* Fetch a file or directory from a Hadoop-compatible filesystem.
632+
*
633+
* Visible for testing
634+
*/
635+
private[spark] def fetchHcfsFile(
636+
path: Path,
637+
targetDir: File,
638+
fs: FileSystem,
639+
conf: SparkConf,
640+
hadoopConf: Configuration,
641+
fileOverwrite: Boolean): Unit = {
642+
if (!targetDir.mkdir()) {
643+
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
644+
}
645+
fs.listStatus(path).foreach { fileStatus =>
646+
val innerPath = fileStatus.getPath
647+
if (fileStatus.isDir) {
648+
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
649+
fileOverwrite)
650+
} else {
651+
val in = fs.open(innerPath)
652+
val targetFile = new File(targetDir, innerPath.getName)
653+
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
654+
}
597655
}
598656
}
599657

0 commit comments

Comments
 (0)