diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh
index 356b3d49b2ffe..f33a15d3dca4e 100644
--- a/bin/load-spark-env.sh
+++ b/bin/load-spark-env.sh
@@ -20,6 +20,7 @@
# This script loads spark-env.sh if it exists, and ensures it is only loaded once.
# spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
# conf/ subdirectory.
+FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 15bda1c9cc29c..d82e64564c1dc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -505,7 +505,8 @@ private[spark] class ExternalSorter[K, V, C](
val k = elem._1
var c = elem._2
while (sorted.hasNext && sorted.head._1 == k) {
- c = mergeCombiners(c, sorted.head._2)
+ val pair = sorted.next()
+ c = mergeCombiners(c, pair._2)
}
(k, c)
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 3cb42d416de4f..3581540db6233 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -507,7 +507,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]
val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
- sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
+
+ // avoid combine before spill
+ sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)))
+ sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
index 659006e1d4781..891b77d692192 100644
--- a/extras/java8-tests/pom.xml
+++ b/extras/java8-tests/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../../pom.xml
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index 48c7ffba4396a..bcfa40abb14cd 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../../pom.xml
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
index db9c0567dd9e2..aadff64113a83 100644
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ b/extras/spark-ganglia-lgpl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../../pom.xml
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index 6189dce9b27da..5ab7e1aa58db9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -160,7 +160,7 @@ class StreamingKMeansModel(
class StreamingKMeans(
var k: Int,
var decayFactor: Double,
- var timeUnit: String) extends Logging {
+ var timeUnit: String) extends Logging with Serializable {
def this() = this(2, 1.0, StreamingKMeans.BATCHES)
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
index f4ea90835dd2c..2d3dca5bf9fe2 100644
--- a/network/yarn/pom.xml
+++ b/network/yarn/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../../pom.xml
diff --git a/pom.xml b/pom.xml
index 767539ca0f1bc..d452af6613c3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -425,7 +425,7 @@
org.xerial.snappy
snappy-java
- 1.1.1.6
+ 1.1.1.7
net.jpountz.lz4
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 9463519df0942..6903bc8f646da 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1100,7 +1100,7 @@ def take(self, num):
[91, 92, 93]
"""
items = []
- totalParts = self._jrdd.partitions().size()
+ totalParts = self.getNumPartitions()
partsScanned = 0
while len(items) < num and partsScanned < totalParts:
@@ -2105,12 +2105,9 @@ def pipeline_func(split, iterator):
self._jrdd_deserializer = self.ctx.serializer
self._bypass_serializer = False
self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None
- self._broadcast = None
- def __del__(self):
- if self._broadcast:
- self._broadcast.unpersist()
- self._broadcast = None
+ def getNumPartitions(self):
+ return self._prev_jrdd.partitions().size()
@property
def _jrdd(self):
@@ -2126,8 +2123,9 @@ def _jrdd(self):
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
- self._broadcast = self.ctx.broadcast(pickled_command)
- pickled_command = ser.dumps(self._broadcast)
+ # The broadcast will have same life cycle as created PythonRDD
+ broadcast = self.ctx.broadcast(pickled_command)
+ pickled_command = ser.dumps(broadcast)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7cb4645899a23..43a1b60a7230f 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -521,10 +521,8 @@ def test_large_closure(self):
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
self.assertEquals(N, rdd.first())
- self.assertTrue(rdd._broadcast is not None)
- rdd = self.sc.parallelize(range(1), 1).map(lambda x: 1)
- self.assertEqual(1, rdd.first())
- self.assertTrue(rdd._broadcast is None)
+ # regression test for SPARK-6886
+ self.assertEqual(1, rdd.map(lambda x: (x, 1)).groupByKey().count())
def test_zip_with_different_serializers(self):
a = self.sc.parallelize(range(5))
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 5e812a1d91c6b..3022d16ef8bce 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -130,7 +130,7 @@ case $option in
if [ -f $pid ]; then
TARGET_ID="$(cat "$pid")"
- if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+ if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo "$command running as process $TARGET_ID. Stop it first."
exit 1
fi
@@ -155,7 +155,7 @@ case $option in
echo $newpid > $pid
sleep 2
# Check if the process has died; in that case we'll tail the log so the user can see
- if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
+ if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
echo "failed to launch $command:"
tail -2 "$log" | sed 's/^/ /'
echo "full log in $log"
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
index 9891b4cc26072..7d281462cd97b 100644
--- a/yarn/alpha/pom.xml
+++ b/yarn/alpha/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
yarn-parent_2.10
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../pom.xml
diff --git a/yarn/pom.xml b/yarn/pom.xml
index e46515190c24f..490a34c8cfcf7 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../pom.xml
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index 26c75af5b7305..c837814910fb9 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
yarn-parent_2.10
- 1.2.2-csd-1-SNAPSHOT
+ 1.2.2-csd-5-SNAPSHOT
../pom.xml