Skip to content

Commit 789ea21

Browse files
committed
2 parents cb53a2c + 81b9d5b commit 789ea21

File tree

51 files changed

+439
-162
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+439
-162
lines changed

core/pom.xml

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -306,26 +306,20 @@
306306
</plugin>
307307
<!-- Unzip py4j so we can include its files in the jar -->
308308
<plugin>
309-
<groupId>org.codehaus.mojo</groupId>
310-
<artifactId>exec-maven-plugin</artifactId>
311-
<version>1.2.1</version>
309+
<groupId>org.apache.maven.plugins</groupId>
310+
<artifactId>maven-antrun-plugin</artifactId>
312311
<executions>
313312
<execution>
314313
<phase>generate-resources</phase>
315314
<goals>
316-
<goal>exec</goal>
315+
<goal>run</goal>
317316
</goals>
318317
</execution>
319318
</executions>
320319
<configuration>
321-
<executable>unzip</executable>
322-
<workingDirectory>../python</workingDirectory>
323-
<arguments>
324-
<argument>-o</argument>
325-
<argument>lib/py4j*.zip</argument>
326-
<argument>-d</argument>
327-
<argument>build</argument>
328-
</arguments>
320+
<tasks>
321+
<unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" />
322+
</tasks>
329323
</configuration>
330324
</plugin>
331325
<plugin>

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
149149
case JobFailed(e: Exception) => scala.util.Failure(e)
150150
}
151151
}
152+
153+
/** Get the corresponding job id for this action. */
154+
def jobId = jobWaiter.jobId
152155
}
153156

154157

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
294294
def checkUIViewPermissions(user: String): Boolean = {
295295
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
296296
viewAcls.mkString(","))
297-
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
297+
!aclsEnabled || user == null || viewAcls.contains(user)
298298
}
299299

300300
/**
@@ -309,7 +309,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
309309
def checkModifyPermissions(user: String): Boolean = {
310310
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
311311
modifyAcls.mkString(","))
312-
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
312+
!aclsEnabled || user == null || modifyAcls.contains(user)
313313
}
314314

315315

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
224224
ui.bind()
225225

226226
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
227-
val hadoopConfiguration: Configuration = {
228-
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
229-
// Explicitly check for S3 environment variables
230-
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
231-
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
232-
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
233-
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
234-
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
235-
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
236-
}
237-
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
238-
conf.getAll.foreach { case (key, value) =>
239-
if (key.startsWith("spark.hadoop.")) {
240-
hadoopConf.set(key.substring("spark.hadoop.".length), value)
241-
}
242-
}
243-
val bufferSize = conf.get("spark.buffer.size", "65536")
244-
hadoopConf.set("io.file.buffer.size", bufferSize)
245-
hadoopConf
246-
}
227+
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
247228

248229
// Optionally log Spark events
249230
private[spark] val eventLogger: Option[EventLoggingListener] = {
@@ -815,7 +796,7 @@ class SparkContext(config: SparkConf) extends Logging {
815796
* Add a file to be downloaded with this Spark job on every node.
816797
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
817798
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
818-
* use `SparkFiles.get(path)` to find its download location.
799+
* use `SparkFiles.get(fileName)` to find its download location.
819800
*/
820801
def addFile(path: String) {
821802
val uri = new URI(path)
@@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
827808
addedFiles(key) = System.currentTimeMillis
828809

829810
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
830-
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
811+
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
812+
hadoopConfiguration)
831813

832814
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
833815
postEnvironmentUpdate()
@@ -1637,4 +1619,3 @@ private[spark] class WritableConverter[T](
16371619
val writableClass: ClassTag[T] => Class[_ <: Writable],
16381620
val convert: Writable => T)
16391621
extends Serializable
1640-

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
2626
import com.google.common.base.Optional
2727
import org.apache.hadoop.io.compress.CompressionCodec
2828

29-
import org.apache.spark.{Partition, SparkContext, TaskContext}
29+
import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
3030
import org.apache.spark.annotation.Experimental
3131
import org.apache.spark.api.java.JavaPairRDD._
3232
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
574574

575575
def name(): String = rdd.name
576576

577+
/**
578+
* :: Experimental ::
579+
* The asynchronous version of the foreach action.
580+
*
581+
* @param f the function to apply to all the elements of the RDD
582+
* @return a FutureAction for the action
583+
*/
584+
@Experimental
585+
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
586+
import org.apache.spark.SparkContext._
587+
rdd.foreachAsync(x => f.call(x))
588+
}
589+
577590
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
545545
* Add a file to be downloaded with this Spark job on every node.
546546
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
547547
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
548-
* use `SparkFiles.get(path)` to find its download location.
548+
* use `SparkFiles.get(fileName)` to find its download location.
549549
*/
550550
def addFile(path: String) {
551551
sc.addFile(path)

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2828
import org.apache.spark.io.CompressionCodec
2929
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
3030
import org.apache.spark.util.ByteBufferInputStream
31+
import org.apache.spark.util.io.ByteArrayChunkOutputStream
3132

3233
/**
3334
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -201,29 +202,12 @@ private object TorrentBroadcast extends Logging {
201202
}
202203

203204
def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
204-
// TODO: Create a special ByteArrayOutputStream that splits the output directly into chunks
205-
// so we don't need to do the extra memory copy.
206-
val bos = new ByteArrayOutputStream()
205+
val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
207206
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
208207
val ser = SparkEnv.get.serializer.newInstance()
209208
val serOut = ser.serializeStream(out)
210209
serOut.writeObject[T](obj).close()
211-
val byteArray = bos.toByteArray
212-
val bais = new ByteArrayInputStream(byteArray)
213-
val numBlocks = math.ceil(byteArray.length.toDouble / BLOCK_SIZE).toInt
214-
val blocks = new Array[ByteBuffer](numBlocks)
215-
216-
var blockId = 0
217-
for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {
218-
val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)
219-
val tempByteArray = new Array[Byte](thisBlockSize)
220-
bais.read(tempByteArray, 0, thisBlockSize)
221-
222-
blocks(blockId) = ByteBuffer.wrap(tempByteArray)
223-
blockId += 1
224-
}
225-
bais.close()
226-
blocks
210+
bos.toArrays.map(ByteBuffer.wrap)
227211
}
228212

229213
def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf
2424
import org.apache.hadoop.security.Credentials
2525
import org.apache.hadoop.security.UserGroupInformation
2626

27-
import org.apache.spark.{Logging, SparkContext, SparkException}
27+
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
28+
import org.apache.spark.annotation.DeveloperApi
2829

2930
import scala.collection.JavaConversions._
3031

3132
/**
33+
* :: DeveloperApi ::
3234
* Contains util methods to interact with Hadoop from Spark.
3335
*/
36+
@DeveloperApi
3437
class SparkHadoopUtil extends Logging {
35-
val conf: Configuration = newConfiguration()
38+
val conf: Configuration = newConfiguration(new SparkConf())
3639
UserGroupInformation.setConfiguration(conf)
3740

3841
/**
@@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging {
6467
}
6568
}
6669

70+
@Deprecated
71+
def newConfiguration(): Configuration = newConfiguration(null)
72+
6773
/**
6874
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
6975
* subsystems.
7076
*/
71-
def newConfiguration(): Configuration = new Configuration()
77+
def newConfiguration(conf: SparkConf): Configuration = {
78+
val hadoopConf = new Configuration()
79+
80+
// Note: this null check is around more than just access to the "conf" object to maintain
81+
// the behavior of the old implementation of this code, for backwards compatibility.
82+
if (conf != null) {
83+
// Explicitly check for S3 environment variables
84+
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
85+
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
86+
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
87+
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
88+
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
89+
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
90+
}
91+
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
92+
conf.getAll.foreach { case (key, value) =>
93+
if (key.startsWith("spark.hadoop.")) {
94+
hadoopConf.set(key.substring("spark.hadoop.".length), value)
95+
}
96+
}
97+
val bufferSize = conf.get("spark.buffer.size", "65536")
98+
hadoopConf.set("io.file.buffer.size", bufferSize)
99+
}
100+
101+
hadoopConf
102+
}
72103

73104
/**
74105
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
@@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging {
86117

87118
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
88119

89-
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
120+
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
90121
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
91122
}
92123

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable
2424
import org.apache.hadoop.fs.{FileStatus, Path}
2525

2626
import org.apache.spark.{Logging, SecurityManager, SparkConf}
27+
import org.apache.spark.deploy.SparkHadoopUtil
2728
import org.apache.spark.scheduler._
2829
import org.apache.spark.ui.SparkUI
2930
import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4041
.map { d => Utils.resolveURI(d) }
4142
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
4243

43-
private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
44+
private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
45+
SparkHadoopUtil.get.newConfiguration(conf))
4446

4547
// A timestamp of when the disk was last accessed to check for log updates
4648
private var lastLogCheckTimeMs = -1L

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
3333
import akka.serialization.SerializationExtension
3434

3535
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
36-
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
36+
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
37+
SparkHadoopUtil}
3738
import org.apache.spark.deploy.DeployMessages._
3839
import org.apache.spark.deploy.history.HistoryServer
3940
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -673,7 +674,8 @@ private[spark] class Master(
673674
app.desc.appUiUrl = notFoundBasePath
674675
return false
675676
}
676-
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
677+
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
678+
SparkHadoopUtil.get.newConfiguration(conf))
677679
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
678680
val eventLogPaths = eventLogInfo.logPaths
679681
val compressionCodec = eventLogInfo.compressionCodec

0 commit comments

Comments
 (0)