Skip to content

merged Apache bug fixes #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
758ebf7
SPARK-6480 [CORE] histogram() bucket function is wrong in some simple…
srowen Mar 26, 2015
a73055f
[SPARK-6667] [PySpark] remove setReuseAddress
Apr 2, 2015
8fa09a4
SPARK-6414: Spark driver failed with NPE on job cancelation
Apr 2, 2015
d82e732
[SPARK-6578] [core] Fix thread-safety issue in outbound path of netw…
rxin Apr 2, 2015
2991dd0
[SPARK-5195][sql]Update HiveMetastoreCatalog.scala(override the Metas…
seayi Feb 3, 2015
f4a9c41
[CORE] The descriptionof jobHistory config should be spark.history.fs…
Apr 3, 2015
eac9525
[HOTFIX] Updating CHANGES.txt for Spark 1.2.2
pwendell Apr 5, 2015
7b7db59
Preparing development version 1.2.3-SNAPSHOT
pwendell Apr 5, 2015
86d1715
[HOTFIX] Bumping versions for Spark 1.2.2
pwendell Apr 5, 2015
7531b50
Preparing Spark release v1.2.2-rc1
pwendell Apr 5, 2015
f7fe87f
[SPARK-6209] Clean up connections in ExecutorClassLoader after failin…
JoshRosen Apr 5, 2015
7a15839
[SPARK-6753] Clone SparkConf in ShuffleSuite tests
kayousterhout Apr 8, 2015
daec1c6
[SPARK-5969][PySpark] Fix descending pyspark.rdd.sortByKey.
foxik Apr 10, 2015
899ffdc
SPARK-6878 [CORE] Fix for sum on empty RDD fails with exception
Apr 14, 2015
3c13936
[SPARK-6905] Upgrade to snappy-java 1.1.1.7
JoshRosen Apr 14, 2015
5845a62
[SPARK-5634] [core] Show correct message in HS when no incomplete app…
Apr 15, 2015
964f544
SPARK-4924 addendum. Minor assembly directory fix in load-spark-env-sh
Raschild Apr 9, 2015
8e9fc27
Revert "[SPARK-5634] [core] Show correct message in HS when no incomp…
JoshRosen Apr 15, 2015
9677b44
[SPARK-6886] [PySpark] fix big closure with shuffle
Apr 15, 2015
e1e7fc0
[SPARK-6952] Handle long args when detecting PID reuse
Apr 17, 2015
059c390
[SPARK-6998][MLlib] Make StreamingKMeans 'Serializable'
zsxwing Apr 20, 2015
d208047
[MINOR] [CORE] Warn users who try to cache RDDs with dynamic allocati…
Apr 28, 2015
6fd74d8
Revert "[MINOR] [CORE] Warn users who try to cache RDDs with dynamic …
pwendell Apr 29, 2015
c0bd415
[SPARK-7181] [CORE] fix inifite loop in Externalsorter's mergeWithAgg…
chouqin Apr 29, 2015
111ad8d
Merge branch 'branch-1.2' of github.com:apache/spark into csd-1.2
markhamstra May 6, 2015
81ae704
fixed csd versions
markhamstra May 6, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# This script loads spark-env.sh if it exists, and ensures it is only loaded once.
# spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
# conf/ subdirectory.
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ private[spark] class ExternalSorter[K, V, C](
val k = elem._1
var c = elem._2
while (sorted.hasNext && sorted.head._1 == k) {
c = mergeCombiners(c, sorted.head._2)
val pair = sorted.next()
c = mergeCombiners(c, pair._2)
}
(k, c)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]
val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))

// avoid combine before spill
sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)))
sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
Expand Down
2 changes: 1 addition & 1 deletion extras/java8-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extras/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extras/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class StreamingKMeansModel(
class StreamingKMeans(
var k: Int,
var decayFactor: Double,
var timeUnit: String) extends Logging {
var timeUnit: String) extends Logging with Serializable {

def this() = this(2, 1.0, StreamingKMeans.BATCHES)

Expand Down
2 changes: 1 addition & 1 deletion network/yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.6</version>
<version>1.1.1.7</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
Expand Down
14 changes: 6 additions & 8 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ def take(self, num):
[91, 92, 93]
"""
items = []
totalParts = self._jrdd.partitions().size()
totalParts = self.getNumPartitions()
partsScanned = 0

while len(items) < num and partsScanned < totalParts:
Expand Down Expand Up @@ -2105,12 +2105,9 @@ def pipeline_func(split, iterator):
self._jrdd_deserializer = self.ctx.serializer
self._bypass_serializer = False
self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None
self._broadcast = None

def __del__(self):
if self._broadcast:
self._broadcast.unpersist()
self._broadcast = None
def getNumPartitions(self):
return self._prev_jrdd.partitions().size()

@property
def _jrdd(self):
Expand All @@ -2126,8 +2123,9 @@ def _jrdd(self):
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
self._broadcast = self.ctx.broadcast(pickled_command)
pickled_command = ser.dumps(self._broadcast)
# The broadcast will have same life cycle as created PythonRDD
broadcast = self.ctx.broadcast(pickled_command)
pickled_command = ser.dumps(broadcast)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,8 @@ def test_large_closure(self):
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
self.assertEquals(N, rdd.first())
self.assertTrue(rdd._broadcast is not None)
rdd = self.sc.parallelize(range(1), 1).map(lambda x: 1)
self.assertEqual(1, rdd.first())
self.assertTrue(rdd._broadcast is None)
# regression test for SPARK-6886
self.assertEqual(1, rdd.map(lambda x: (x, 1)).groupByKey().count())

def test_zip_with_different_serializers(self):
a = self.sc.parallelize(range(5))
Expand Down
4 changes: 2 additions & 2 deletions sbin/spark-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ case $option in

if [ -f $pid ]; then
TARGET_ID="$(cat "$pid")"
if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo "$command running as process $TARGET_ID. Stop it first."
exit 1
fi
Expand All @@ -155,7 +155,7 @@ case $option in
echo $newpid > $pid
sleep 2
# Check if the process has died; in that case we'll tail the log so the user can see
if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
echo "failed to launch $command:"
tail -2 "$log" | sed 's/^/ /'
echo "full log in $log"
Expand Down
2 changes: 1 addition & 1 deletion yarn/alpha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>yarn-parent_2.10</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<properties>
Expand Down
2 changes: 1 addition & 1 deletion yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion yarn/stable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>yarn-parent_2.10</artifactId>
<version>1.2.2-csd-1-SNAPSHOT</version>
<version>1.2.2-csd-5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<properties>
Expand Down