Skip to content

SKIPME merged Apache branch-1.2 #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging {
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
if (scheme == null) {
Seq.empty
} else {
FileSystem.getAllStatistics
.filter { stats => scheme.equals(stats.getScheme()) }
.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
Expand Down
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{InputStream, OutputStream}
import java.io.{IOException, InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
Expand Down Expand Up @@ -122,8 +122,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
new SnappyOutputStream(s, blockSize)
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
*/
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

private[this] var closed: Boolean = false

override def write(b: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte]): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b, off, len)
}

override def flush(): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.flush()
}

override def close(): Unit = {
if (!closed) {
closed = true
os.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
import org.apache.spark.util.Utils

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
Expand Down Expand Up @@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou
}
}
}

test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") {
val tempDir = Utils.createTempDir()
val outPath = new File(tempDir, "outfile")

// Intentionally call this method with a null scheme, which will store an entry for a FileSystem
// with a null scheme into Hadoop's global `FileSystem.statisticsTable`.
FileSystem.getStatistics(null, classOf[FileSystem])

// Prior to fixing SPARK-8062, this would fail with a NullPointerException in
// SparkHadoopUtil.getFileSystemThreadStatistics
try {
val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
rdd.saveAsTextFile(outPath.toString)
sc.textFile(outPath.toString).count()
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
3 changes: 2 additions & 1 deletion docs/mllib-collaborative-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ other signals), you can use the `trainImplicit` method to get better results.

{% highlight scala %}
val alpha = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, alpha)
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)
{% endhighlight %}
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object SparkPageRank {
showWarning()

val sparkConf = new SparkConf().setAppName("PageRank")
val iters = if (args.length > 0) args(1).toInt else 10
val iters = if (args.length > 1) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object DecisionTreeRunner {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object GradientBoostedTreesRunner {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class StreamingKMeans(

/** Set the decay factor directly (for forgetful algorithms). */
def setDecayFactor(a: Double): this.type = {
this.decayFactor = decayFactor
this.decayFactor = a
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ private[mllib] object NumericParser {
}
} else if (token == ")") {
parsing = false
} else if (token.trim.isEmpty){
// ignore whitespaces between delim chars, e.g. ", ["
} else {
// expecting a number
items.append(parseDouble(token))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase {
assert(math.abs(c1) ~== 0.8 absTol 0.6)
}

test("SPARK-7946 setDecayFactor") {
val kMeans = new StreamingKMeans()
assert(kMeans.decayFactor === 1.0)
kMeans.setDecayFactor(2.0)
assert(kMeans.decayFactor === 2.0)
}

def StreamingKMeansDataGenerator(
numPoints: Int,
numBatches: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class LabeledPointSuite extends FunSuite {
}
}

test("parse labeled points with whitespaces") {
val point = LabeledPoint.parse("(0.0, [1.0, 2.0])")
assert(point === LabeledPoint(0.0, Vectors.dense(1.0, 2.0)))
}

test("parse labeled points with v0.9 format") {
val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0")
assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ class NumericParserSuite extends FunSuite {
}
}
}

test("parser with whitespaces") {
val s = "(0.0, [1.0, 2.0])"
val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]]
assert(parsed(0).asInstanceOf[Double] === 0.0)
assert(parsed(1).asInstanceOf[Array[Double]] === Array(1.0, 2.0))
}
}
4 changes: 3 additions & 1 deletion python/pyspark/mllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
# MLlib currently needs and NumPy 1.4+, so complain if lower

import numpy
if numpy.version.version < '1.4':

ver = [int(x) for x in numpy.version.version.split('.')[:2]]
if ver < [1, 4]:
raise Exception("MLlib requires NumPy 1.4+")

__all__ = ['classification', 'clustering', 'feature', 'linalg', 'random',
Expand Down
6 changes: 5 additions & 1 deletion sbin/start-master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
sbin="`dirname "$0"`"
sbin="`cd "$sbin"; pwd`"

ORIGINAL_ARGS="$@"

START_TACHYON=false

while (( "$#" )); do
Expand Down Expand Up @@ -53,7 +55,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
--ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS

if [ "$START_TACHYON" == "true" ]; then
"$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP
Expand Down
14 changes: 13 additions & 1 deletion sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.sql.catalyst.SparkSQLParser
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -224,12 +225,23 @@ private[hive] object HiveQl {
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
val hContext = new Context(new HiveConf())
val hContext = new Context(hiveConf)
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}

/**
* Returns the HiveConf
*/
private[this] def hiveConf(): HiveConf = {
val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
if (ss == null) {
new HiveConf()
} else {
ss.getConf
}
}

/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser(sql)
Expand Down