Skip to content

SKIPME Pickup recent bug fixes from Apache branch-0.9 #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand Down Expand Up @@ -60,12 +61,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)

case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {

// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.{SparkException, SparkEnv, Logging, TaskState}
import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -50,6 +50,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -139,8 +140,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
executorActor(task.executorId) ! LaunchTask(task)
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes). " +
"Consider using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
freeCores(task.executorId) -= 1
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator

import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -230,7 +231,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
Expand All @@ -245,13 +246,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
Expand Down Expand Up @@ -324,7 +325,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {

def isEmpty = pairs.length == 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}

import org.scalatest.FunSuite

class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {

test("serialized task larger than akka frame size") {
val conf = new SparkConf
conf.set("spark.akka.frameSize","1")
conf.set("spark.default.parallelism","1")
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
larger.collect()
}
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
val smaller = sc.parallelize(1 to 4).collect()
assert(smaller.size === 4)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)

collisionPairs.foreach { case (w1, w2) =>
// String.hashCode is documented to use a specific algorithm, but check just in case
assert(w1.hashCode === w2.hashCode)
}

(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
Expand All @@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count == 100000 + collisionPairs.size * 2)
assert(count === 100000 + collisionPairs.size * 2)
}

test("spilling with many hash collisions") {
val conf = new SparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)

// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}

val it = map.iterator
var count = 0
while (it.hasNext) {
val kv = it.next()
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
}

test("spilling with hash collisions using the Int.MaxValue key") {
Expand All @@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}

/**
* A dummy class that always returns the same hash code, to easily test hash collisions
*/
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
override def hashCode(): Int = h
}
14 changes: 12 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ def get_spark_ami(opts):
"c3.xlarge": "pvm",
"c3.2xlarge": "pvm",
"c3.4xlarge": "pvm",
"c3.8xlarge": "pvm"
"c3.8xlarge": "pvm",
"r3.large": "hvm",
"r3.xlarge": "hvm",
"r3.2xlarge": "hvm",
"r3.4xlarge": "hvm",
"r3.8xlarge": "hvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
Expand Down Expand Up @@ -496,7 +501,12 @@ def get_num_disks(instance_type):
"c3.xlarge": 2,
"c3.2xlarge": 2,
"c3.4xlarge": 2,
"c3.8xlarge": 2
"c3.8xlarge": 2,
"r3.large": 1,
"r3.xlarge": 1,
"r3.2xlarge": 1,
"r3.4xlarge": 1,
"r3.8xlarge": 2
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ class SparkFlumeEvent() extends Externalizable {
def readExternal(in: ObjectInput) {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
in.read(bodyBuff)
in.readFully(bodyBuff)

val numHeaders = in.readInt()
val headers = new java.util.HashMap[CharSequence, CharSequence]

for (i <- 0 until numHeaders) {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.read(keyBuff)
in.readFully(keyBuff)
val key : String = Utils.deserialize(keyBuff)

val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.read(valBuff)
in.readFully(valBuff)
val value : String = Utils.deserialize(valBuff)

headers.put(key, value)
Expand Down
3 changes: 2 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* Construct a new edge partition by applying the function f to all
* edges in this partition.
*
* Be careful not to keep references to the objects passed to `f`.
* To improve GC performance the same object is re-used for each call.
*
* @param f a function from an edge to a new attribute
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
Expand Down Expand Up @@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* order of the edges returned by `EdgePartition.iterator` and
* should return attributes equal to the number of edges.
*
* @param f a function from an edge to a new attribute
* @param iter an iterator for the new attribute values
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
* applied to each edge
* @return a new edge partition with the attribute values replaced
*/
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
// Faster than iter.toArray, because the expected size is known.
val newData = new Array[ED2](data.size)
var i = 0
while (iter.hasNext) {
Expand Down Expand Up @@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the edges in this partition.
*
* Be careful not to keep references to the objects from this iterator.
* To improve GC performance the same object is re-used in `next()`.
*
* @return an iterator over edges in the partition
*/
def iterator = new Iterator[Edge[ED]] {
Expand Down Expand Up @@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
*/
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// Current position in the array.
private var pos = 0

// A triplet object that this iterator.next() call returns. We reuse this object to avoid
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]

private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)

override def hasNext: Boolean = pos < edgePartition.size

override def next() = {
val triplet = new EdgeTriplet[VD, ED]
triplet.srcId = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
triplet.srcAttr = vmap(triplet.srcId)
triplet.dstId = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
triplet.dstAttr = vmap(triplet.dstId)
triplet.attr = edgePartition.data(pos)
pos += 1
Expand Down
Loading