diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index da7850f766fab..2946af70984c9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
@@ -60,12 +61,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)
- case LaunchTask(taskDesc) =>
- logInfo("Got assigned task " + taskDesc.taskId)
+ case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val taskDesc = ser.deserialize[TaskDescription](data.value)
+ logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index b8aaa097e9b99..90f220c589f72 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -29,7 +29,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {
// Driver to executors
- case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
+ case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2223236f4c3a4..dc5c95f1236ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,12 +27,12 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.{SparkException, SparkEnv, Logging, TaskState}
import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -50,6 +50,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
+ private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -139,8 +140,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.executorId) -= 1
- executorActor(task.executorId) ! LaunchTask(task)
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val serializedTask = ser.serialize(task)
+ if (serializedTask.limit >= akkaFrameSize - 1024) {
+ val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
+ scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
+ try {
+ var msg = "Serialized task %s:%d was %d bytes which " +
+ "exceeds spark.akka.frameSize (%d bytes). " +
+ "Consider using broadcast variables for large values."
+ msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
+ taskSet.abort(msg)
+ } catch {
+ case e: Exception => logError("Exception in error callback", e)
+ }
+ }
+ }
+ else {
+ freeCores(task.executorId) -= 1
+ executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 6f368179bbfbd..b4e682412029b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
+import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -230,7 +231,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
- private val inputStreams = Seq(sortedMap) ++ spilledMaps
+ private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
@@ -245,13 +246,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
- private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+ private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
- while (it.hasNext && kc._1.hashCode() == minHash) {
+ while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
@@ -324,7 +325,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
- private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
+ private class StreamBuffer(
+ val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {
def isEmpty = pairs.length == 0
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000..efef9d26dadca
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
+import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
+
+import org.scalatest.FunSuite
+
+class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
+
+ test("serialized task larger than akka frame size") {
+ val conf = new SparkConf
+ conf.set("spark.akka.frameSize","1")
+ conf.set("spark.default.parallelism","1")
+ sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
+ val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+ val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
+ val larger = sc.parallelize(Seq(buffer))
+ val thrown = intercept[SparkException] {
+ larger.collect()
+ }
+ assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
+ val smaller = sc.parallelize(1 to 4).collect()
+ assert(smaller.size === 4)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fce1184d46364..8675e0da0bafa 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)
+ collisionPairs.foreach { case (w1, w2) =>
+ // String.hashCode is documented to use a specific algorithm, but check just in case
+ assert(w1.hashCode === w2.hashCode)
+ }
+
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
- assert(count == 100000 + collisionPairs.size * 2)
+ assert(count === 100000 + collisionPairs.size * 2)
+ }
+
+ test("spilling with many hash collisions") {
+ val conf = new SparkConf(true)
+ conf.set("spark.shuffle.memoryFraction", "0.0001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+ val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
+
+ // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
+ // problems if the map fails to group together the objects with the same code (SPARK-2043).
+ for (i <- 1 to 10) {
+ for (j <- 1 to 10000) {
+ map.insert(FixedHashObject(j, j % 2), 1)
+ }
+ }
+
+ val it = map.iterator
+ var count = 0
+ while (it.hasNext) {
+ val kv = it.next()
+ assert(kv._2 === 10)
+ count += 1
+ }
+ assert(count === 10000)
}
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}
+
+/**
+ * A dummy class that always returns the same hash code, to easily test hash collisions
+ */
+case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
+ override def hashCode(): Int = h
+}
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index a8552a000bb74..cfc61b20100c6 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -194,7 +194,12 @@ def get_spark_ami(opts):
"c3.xlarge": "pvm",
"c3.2xlarge": "pvm",
"c3.4xlarge": "pvm",
- "c3.8xlarge": "pvm"
+ "c3.8xlarge": "pvm",
+ "r3.large": "hvm",
+ "r3.xlarge": "hvm",
+ "r3.2xlarge": "hvm",
+ "r3.4xlarge": "hvm",
+ "r3.8xlarge": "hvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
@@ -496,7 +501,12 @@ def get_num_disks(instance_type):
"c3.xlarge": 2,
"c3.2xlarge": 2,
"c3.4xlarge": 2,
- "c3.8xlarge": 2
+ "c3.8xlarge": 2,
+ "r3.large": 1,
+ "r3.xlarge": 1,
+ "r3.2xlarge": 1,
+ "r3.4xlarge": 1,
+ "r3.8xlarge": 2
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ce3ef47cfe4bc..4d5db1d9954bc 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -61,7 +61,7 @@ class SparkFlumeEvent() extends Externalizable {
def readExternal(in: ObjectInput) {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
- in.read(bodyBuff)
+ in.readFully(bodyBuff)
val numHeaders = in.readInt()
val headers = new java.util.HashMap[CharSequence, CharSequence]
@@ -69,12 +69,12 @@ class SparkFlumeEvent() extends Externalizable {
for (i <- 0 until numHeaders) {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
- in.read(keyBuff)
+ in.readFully(keyBuff)
val key : String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
- in.read(valBuff)
+ in.readFully(valBuff)
val value : String = Utils.deserialize(valBuff)
headers.put(key, value)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fe03ae4a629b9..5f91f966bb26b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
+ val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
+ p.next._2.iterator.map(_.copy())
}
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 57fa5eefd5e09..2e05f5d4e4969 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* Construct a new edge partition by applying the function f to all
* edges in this partition.
*
+ * Be careful not to keep references to the objects passed to `f`.
+ * To improve GC performance the same object is re-used for each call.
+ *
* @param f a function from an edge to a new attribute
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
@@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* order of the edges returned by `EdgePartition.iterator` and
* should return attributes equal to the number of edges.
*
- * @param f a function from an edge to a new attribute
+ * @param iter an iterator for the new attribute values
* @tparam ED2 the type of the new attribute
- * @return a new edge partition with the result of the function `f`
- * applied to each edge
+ * @return a new edge partition with the attribute values replaced
*/
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+ // Faster than iter.toArray, because the expected size is known.
val newData = new Array[ED2](data.size)
var i = 0
while (iter.hasNext) {
@@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the edges in this partition.
*
+ * Be careful not to keep references to the objects from this iterator.
+ * To improve GC performance the same object is re-used in `next()`.
+ *
* @return an iterator over edges in the partition
*/
def iterator = new Iterator[Edge[ED]] {
@@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
+ *
+ * Be careful not to keep references to the objects from this iterator. To improve GC performance
+ * the same object is re-used in `next()`.
*/
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 886c250d7cffd..220a89d73d711 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// Current position in the array.
private var pos = 0
- // A triplet object that this iterator.next() call returns. We reuse this object to avoid
- // allocating too many temporary Java objects.
- private val triplet = new EdgeTriplet[VD, ED]
-
private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
+ val triplet = new EdgeTriplet[VD, ED]
triplet.srcId = edgePartition.srcIds(pos)
- // assert(vmap.containsKey(e.src.id))
triplet.srcAttr = vmap(triplet.srcId)
triplet.dstId = edgePartition.dstIds(pos)
- // assert(vmap.containsKey(e.dst.id))
triplet.dstAttr = vmap(triplet.dstId)
triplet.attr = edgePartition.data(pos)
pos += 1
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
new file mode 100644
index 0000000000000..9cbb2d2acdc2d
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+
+class EdgeTripletIteratorSuite extends FunSuite {
+ test("iterator.toList") {
+ val builder = new EdgePartitionBuilder[Int]
+ builder.add(1, 2, 0)
+ builder.add(1, 3, 0)
+ builder.add(1, 4, 0)
+ val vidmap = new VertexIdToIndexMap
+ vidmap.add(1)
+ vidmap.add(2)
+ vidmap.add(3)
+ vidmap.add(4)
+ val vs = Array.fill(vidmap.capacity)(0)
+ val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
+ val result = iter.toList.map(et => (et.srcId, et.dstId))
+ assert(result === Seq((1, 2), (1, 3), (1, 4)))
+ }
+}
diff --git a/pom.xml b/pom.xml
index ae99fea709289..3df25255fb82a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,6 +184,11 @@
guava
14.0.1
+
+ commons-codec
+ commons-codec
+ 1.5
+
com.google.code.findbugs
jsr305
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 57742f8eca134..bdf883e0ddc54 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -280,6 +280,7 @@ object SparkBuild extends Build {
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.13.0",
"net.java.dev.jets3t" % "jets3t" % "0.7.1",
+ "commons-codec" % "commons-codec" % "1.5", // Prevent jets3t from including the older version of commons-codec
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.avro" % "avro" % "1.7.4",
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 6a7c23a069bf8..eb5dbb8de2b39 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -933,7 +933,7 @@ def _change_cell_value(cell, newval):
Note: These can never be renamed due to client compatibility issues"""
def _getobject(modname, attribute):
- mod = __import__(modname)
+ mod = __import__(modname, fromlist=[attribute])
return mod.__dict__[attribute]
def _generateImage(size, mode, str_rep):
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a26ee00aadc7e..6956ec820659f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -943,7 +943,7 @@ def fullOuterJoin(self, other, numPartitions=None):
return python_full_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
- def partitionBy(self, numPartitions, partitionFunc=hash):
+ def partitionBy(self, numPartitions, partitionFunc=None):
"""
Return a copy of the RDD partitioned using the specified partitioner.
@@ -954,6 +954,9 @@ def partitionBy(self, numPartitions, partitionFunc=hash):
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
+
+ if partitionFunc is None:
+ partitionFunc = lambda x: 0 if x is None else hash(x)
# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 527104587fd31..c741bfb75a6ab 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -33,6 +33,14 @@
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int
+_have_scipy = False
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy, but that's okay, we'll skip those tests
+ pass
+
class PySparkTestCase(unittest.TestCase):
@@ -234,5 +242,22 @@ def test_termination_sigterm(self):
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
+@unittest.skipIf(not _have_scipy, "SciPy not installed")
+class SciPyTests(PySparkTestCase):
+ """General PySpark tests that depend on scipy """
+
+ def test_serialize(self):
+ from scipy.special import gammaln
+ x = range(1, 5)
+ expected = map(gammaln, x)
+ observed = self.sc.parallelize(x).map(gammaln).collect()
+ self.assertEqual(expected, observed)
+
+
if __name__ == "__main__":
+ if not _have_scipy:
+ print "NOTE: Skipping SciPy tests as it does not seem to be installed"
unittest.main()
+ if not _have_scipy:
+ print "NOTE: SciPy tests were skipped as it does not seem to be installed"
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 79345fbf6096a..9a1d3d5557de9 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -879,6 +879,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
+ if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
createInterpreter()
// sets in to some kind of reader depending on environmental cues
@@ -936,16 +937,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
- val master = this.master match {
- case Some(m) => m
- case None => {
- val prop = System.getenv("MASTER")
- if (prop != null) prop else "local"
- }
- }
val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
val conf = new SparkConf()
- .setMaster(master)
+ .setMaster(getMaster())
.setAppName("Spark shell")
.setJars(jars)
.set("spark.repl.class.uri", intp.classServer.uri)
@@ -960,6 +954,17 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
sparkContext
}
+ private def getMaster(): String = {
+ val master = this.master match {
+ case Some(m) => m
+ case None => {
+ val prop = System.getenv("MASTER")
+ if (prop != null) prop else "local"
+ }
+ }
+ master
+ }
+
/** process command-line arguments and do as they request */
def process(args: Array[String]): Boolean = {
val command = new SparkCommandLine(args.toList, msg => echo(msg))
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6956cd65c7209..9e5e2d5ceaca1 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
@@ -264,6 +263,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
// handle any add jars
+ val cachedSecondaryJarLinks = ListBuffer.empty[String]
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
@@ -271,9 +271,11 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache, true)
+ linkname, statCache)
+ cachedSecondaryJarLinks += linkname
}
}
+ sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
@@ -462,9 +464,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
object Client {
- val SPARK_JAR: String = "spark.jar"
- val APP_JAR: String = "app.jar"
+ val SPARK_JAR: String = "__spark__.jar"
+ val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
+ val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
@@ -491,11 +494,19 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
+
+ val cachedSecondaryJarLinks =
+ sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
+ .filter(_.nonEmpty)
+
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
+ cachedSecondaryJarLinks.foreach(jarLink =>
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
@@ -504,6 +515,9 @@ object Client {
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
+ cachedSecondaryJarLinks.foreach(jarLink =>
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8ccdea663a11d..6ff8c6c3b2497 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import scala.collection.mutable.{ListBuffer, HashMap, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
@@ -281,18 +280,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
// Handle jars local to the ApplicationMaster.
+ val cachedSecondaryJarLinks = ListBuffer.empty[String]
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
- // Only add the resource to the Spark ApplicationMaster.
- val appMasterOnly = true
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
- linkname, statCache, appMasterOnly)
+ linkname, statCache)
+ cachedSecondaryJarLinks += linkname
}
}
+ sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
// Handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
@@ -478,9 +478,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
object Client {
- val SPARK_JAR: String = "spark.jar"
- val APP_JAR: String = "app.jar"
+ val SPARK_JAR: String = "__spark__.jar"
+ val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
+ val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
@@ -507,12 +508,19 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
+
+ val cachedSecondaryJarLinks =
+ sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
+ .filter(_.nonEmpty)
+
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
- .toBoolean
+ val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
+ cachedSecondaryJarLinks.foreach(jarLink =>
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
@@ -521,6 +529,9 @@ object Client {
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
+ cachedSecondaryJarLinks.foreach(jarLink =>
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")