Skip to content

Commit 0dd3947

Browse files
committed
kMerge remote-tracking branch 'upstream/master' into ldaonline
2 parents 0d0f3ee + 31d435e commit 0dd3947

File tree

52 files changed

+3903
-2893
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3903
-2893
lines changed

build/sbt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,32 @@ loadConfigFile() {
125125
[[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@"
126126
[[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@"
127127

128+
exit_status=127
129+
saved_stty=""
130+
131+
restoreSttySettings() {
132+
stty $saved_stty
133+
saved_stty=""
134+
}
135+
136+
onExit() {
137+
if [[ "$saved_stty" != "" ]]; then
138+
restoreSttySettings
139+
fi
140+
exit $exit_status
141+
}
142+
143+
saveSttySettings() {
144+
saved_stty=$(stty -g 2>/dev/null)
145+
if [[ ! $? ]]; then
146+
saved_stty=""
147+
fi
148+
}
149+
150+
saveSttySettings
151+
trap onExit INT
152+
128153
run "$@"
154+
155+
exit_status=$?
156+
onExit

build/sbt-launch-lib.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ execRunner () {
8181
echo ""
8282
}
8383

84-
exec "$@"
84+
"$@"
8585
}
8686

8787
addJava () {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14201420
val callSite = getCallSite
14211421
val cleanedFunc = clean(func)
14221422
logInfo("Starting job: " + callSite.shortForm)
1423+
if (conf.getBoolean("spark.logLineage", false)) {
1424+
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
1425+
}
14231426
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
14241427
resultHandler, localProperties.get)
14251428
progressBar.foreach(_.finishAll())

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ private[spark] class Master(
671671

672672
def registerApplication(app: ApplicationInfo): Unit = {
673673
val appAddress = app.driver.path.address
674-
if (addressToWorker.contains(appAddress)) {
674+
if (addressToApp.contains(appAddress)) {
675675
logInfo("Attempted to re-register application at same address: " + appAddress)
676676
return
677677
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
314314
* Return whether the request is acknowledged.
315315
*/
316316
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
317+
if (numAdditionalExecutors < 0) {
318+
throw new IllegalArgumentException(
319+
"Attempted to request a negative number of additional executor(s) " +
320+
s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
321+
}
317322
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
318323
logDebug(s"Number of pending executors is now $numPendingExecutors")
319324
numPendingExecutors += numAdditionalExecutors

data/mllib/sample_lda_data.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
1 2 6 0 2 3 1 1 0 0 3
2+
1 3 0 1 3 0 0 2 0 0 1
3+
1 4 1 0 0 4 9 0 1 2 0
4+
2 1 0 3 0 0 5 0 2 3 9
5+
3 1 1 9 3 0 2 0 0 1 3
6+
4 2 0 3 4 5 1 1 1 4 0
7+
2 1 0 3 0 0 5 0 2 2 9
8+
1 1 1 9 2 1 2 0 0 1 3
9+
4 4 0 3 4 2 1 3 0 0 0
10+
2 8 2 0 3 0 2 0 2 7 2
11+
1 1 1 9 0 2 2 0 0 3 3
12+
4 1 0 0 4 5 1 3 0 1 0

dev/run-tests

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ function handle_error () {
3636
}
3737

3838

39-
# Build against the right verison of Hadoop.
39+
# Build against the right version of Hadoop.
4040
{
4141
if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then
4242
if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then
@@ -77,7 +77,7 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
7777
fi
7878
}
7979

80-
# Only run Hive tests if there are sql changes.
80+
# Only run Hive tests if there are SQL changes.
8181
# Partial solution for SPARK-1455.
8282
if [ -n "$AMPLAB_JENKINS" ]; then
8383
git fetch origin master:master
@@ -183,7 +183,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
183183
if [ -n "$_SQL_TESTS_ONLY" ]; then
184184
# This must be an array of individual arguments. Otherwise, having one long string
185185
# will be interpreted as a single test, which doesn't work.
186-
SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "mllib/test")
186+
SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test")
187187
else
188188
SBT_MAVEN_TEST_ARGS=("test")
189189
fi

docs/mllib-clustering.md

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ has the following parameters:
5555

5656
Power iteration clustering is a scalable and efficient algorithm for clustering points given pointwise mutual affinity values. Internally the algorithm:
5757

58-
* accepts a [Graph](https://spark.apache.org/docs/0.9.2/api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
58+
* accepts a [Graph](api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
5959
* calculates the principal eigenvalue and eigenvector
6060
* Clusters each of the input points according to their principal eigenvector component value
6161

@@ -71,6 +71,35 @@ Example outputs for a dataset inspired by the paper - but with five clusters ins
7171
<!-- Images are downsized intentionally to improve quality on retina displays -->
7272
</p>
7373

74+
### Latent Dirichlet Allocation (LDA)
75+
76+
[Latent Dirichlet Allocation (LDA)](http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation)
77+
is a topic model which infers topics from a collection of text documents.
78+
LDA can be thought of as a clustering algorithm as follows:
79+
80+
* Topics correspond to cluster centers, and documents correspond to examples (rows) in a dataset.
81+
* Topics and documents both exist in a feature space, where feature vectors are vectors of word counts.
82+
* Rather than estimating a clustering using a traditional distance, LDA uses a function based
83+
on a statistical model of how text documents are generated.
84+
85+
LDA takes in a collection of documents as vectors of word counts.
86+
It learns clustering using [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
87+
on the likelihood function. After fitting on the documents, LDA provides:
88+
89+
* Topics: Inferred topics, each of which is a probability distribution over terms (words).
90+
* Topic distributions for documents: For each document in the training set, LDA gives a probability distribution over topics.
91+
92+
LDA takes the following parameters:
93+
94+
* `k`: Number of topics (i.e., cluster centers)
95+
* `maxIterations`: Limit on the number of iterations of EM used for learning
96+
* `docConcentration`: Hyperparameter for prior over documents' distributions over topics. Currently must be > 1, where larger values encourage smoother inferred distributions.
97+
* `topicConcentration`: Hyperparameter for prior over topics' distributions over terms (words). Currently must be > 1, where larger values encourage smoother inferred distributions.
98+
* `checkpointInterval`: If using checkpointing (set in the Spark configuration), this parameter specifies the frequency with which checkpoints will be created. If `maxIterations` is large, using checkpointing can help reduce shuffle file sizes on disk and help with failure recovery.
99+
100+
*Note*: LDA is a new feature with some missing functionality. In particular, it does not yet
101+
support prediction on new documents, and it does not have a Python API. These will be added in the future.
102+
74103
### Examples
75104

76105
#### k-means
@@ -293,6 +322,104 @@ for i in range(2):
293322

294323
</div>
295324

325+
#### Latent Dirichlet Allocation (LDA) Example
326+
327+
In the following example, we load word count vectors representing a corpus of documents.
328+
We then use [LDA](api/scala/index.html#org.apache.spark.mllib.clustering.LDA)
329+
to infer three topics from the documents. The number of desired clusters is passed
330+
to the algorithm. We then output the topics, represented as probability distributions over words.
331+
332+
<div class="codetabs">
333+
<div data-lang="scala" markdown="1">
334+
335+
{% highlight scala %}
336+
import org.apache.spark.mllib.clustering.LDA
337+
import org.apache.spark.mllib.linalg.Vectors
338+
339+
// Load and parse the data
340+
val data = sc.textFile("data/mllib/sample_lda_data.txt")
341+
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
342+
// Index documents with unique IDs
343+
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
344+
345+
// Cluster the documents into three topics using LDA
346+
val ldaModel = new LDA().setK(3).run(corpus)
347+
348+
// Output topics. Each is a distribution over words (matching word count vectors)
349+
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
350+
val topics = ldaModel.topicsMatrix
351+
for (topic <- Range(0, 3)) {
352+
print("Topic " + topic + ":")
353+
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
354+
println()
355+
}
356+
{% endhighlight %}
357+
</div>
358+
359+
<div data-lang="java" markdown="1">
360+
{% highlight java %}
361+
import scala.Tuple2;
362+
363+
import org.apache.spark.api.java.*;
364+
import org.apache.spark.api.java.function.Function;
365+
import org.apache.spark.mllib.clustering.DistributedLDAModel;
366+
import org.apache.spark.mllib.clustering.LDA;
367+
import org.apache.spark.mllib.linalg.Matrix;
368+
import org.apache.spark.mllib.linalg.Vector;
369+
import org.apache.spark.mllib.linalg.Vectors;
370+
import org.apache.spark.SparkConf;
371+
372+
public class JavaLDAExample {
373+
public static void main(String[] args) {
374+
SparkConf conf = new SparkConf().setAppName("LDA Example");
375+
JavaSparkContext sc = new JavaSparkContext(conf);
376+
377+
// Load and parse the data
378+
String path = "data/mllib/sample_lda_data.txt";
379+
JavaRDD<String> data = sc.textFile(path);
380+
JavaRDD<Vector> parsedData = data.map(
381+
new Function<String, Vector>() {
382+
public Vector call(String s) {
383+
String[] sarray = s.trim().split(" ");
384+
double[] values = new double[sarray.length];
385+
for (int i = 0; i < sarray.length; i++)
386+
values[i] = Double.parseDouble(sarray[i]);
387+
return Vectors.dense(values);
388+
}
389+
}
390+
);
391+
// Index documents with unique IDs
392+
JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
393+
new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
394+
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
395+
return doc_id.swap();
396+
}
397+
}
398+
));
399+
corpus.cache();
400+
401+
// Cluster the documents into three topics using LDA
402+
DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);
403+
404+
// Output topics. Each is a distribution over words (matching word count vectors)
405+
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
406+
+ " words):");
407+
Matrix topics = ldaModel.topicsMatrix();
408+
for (int topic = 0; topic < 3; topic++) {
409+
System.out.print("Topic " + topic + ":");
410+
for (int word = 0; word < ldaModel.vocabSize(); word++) {
411+
System.out.print(" " + topics.apply(word, topic));
412+
}
413+
System.out.println();
414+
}
415+
}
416+
}
417+
{% endhighlight %}
418+
</div>
419+
420+
</div>
421+
422+
296423
In order to run the above application, follow the instructions
297424
provided in the [Self-Contained Applications](quick-start.html#self-contained-applications)
298425
section of the Spark

ec2/spark_ec2.py

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import sys
3535
import tarfile
3636
import tempfile
37+
import textwrap
3738
import time
3839
import urllib2
3940
import warnings
@@ -61,10 +62,10 @@
6162

6263
DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
6364
DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
64-
MESOS_SPARK_EC2_BRANCH = "branch-1.3"
6565

66-
# A URL prefix from which to fetch AMI information
67-
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH)
66+
# Default location to get the spark-ec2 scripts (and ami-list) from
67+
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
68+
DEFAULT_SPARK_EC2_BRANCH = "branch-1.3"
6869

6970

7071
def setup_boto():
@@ -146,6 +147,14 @@ def parse_args():
146147
"--spark-git-repo",
147148
default=DEFAULT_SPARK_GITHUB_REPO,
148149
help="Github repo from which to checkout supplied commit hash (default: %default)")
150+
parser.add_option(
151+
"--spark-ec2-git-repo",
152+
default=DEFAULT_SPARK_EC2_GITHUB_REPO,
153+
help="Github repo from which to checkout spark-ec2 (default: %default)")
154+
parser.add_option(
155+
"--spark-ec2-git-branch",
156+
default=DEFAULT_SPARK_EC2_BRANCH,
157+
help="Github repo branch of spark-ec2 to use (default: %default)")
149158
parser.add_option(
150159
"--hadoop-major-version", default="1",
151160
help="Major version of Hadoop (default: %default)")
@@ -332,7 +341,12 @@ def get_spark_ami(opts):
332341
print >> stderr,\
333342
"Don't recognize %s, assuming type is pvm" % opts.instance_type
334343

335-
ami_path = "%s/%s/%s" % (AMI_PREFIX, opts.region, instance_type)
344+
# URL prefix from which to fetch AMI information
345+
ami_prefix = "{r}/{b}/ami-list".format(
346+
r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1),
347+
b=opts.spark_ec2_git_branch)
348+
349+
ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
336350
try:
337351
ami = urllib2.urlopen(ami_path).read().strip()
338352
print "Spark AMI: " + ami
@@ -649,12 +663,15 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
649663

650664
# NOTE: We should clone the repository before running deploy_files to
651665
# prevent ec2-variables.sh from being overwritten
666+
print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
667+
r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)
652668
ssh(
653669
host=master,
654670
opts=opts,
655671
command="rm -rf spark-ec2"
656672
+ " && "
657-
+ "git clone https://github.com/mesos/spark-ec2.git -b {b}".format(b=MESOS_SPARK_EC2_BRANCH)
673+
+ "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo,
674+
b=opts.spark_ec2_git_branch)
658675
)
659676

660677
print "Deploying files to master..."
@@ -681,21 +698,32 @@ def setup_spark_cluster(master, opts):
681698
print "Ganglia started at http://%s:5080/ganglia" % master
682699

683700

684-
def is_ssh_available(host, opts):
701+
def is_ssh_available(host, opts, print_ssh_output=True):
685702
"""
686703
Check if SSH is available on a host.
687704
"""
688-
try:
689-
with open(os.devnull, 'w') as devnull:
690-
ret = subprocess.check_call(
691-
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
692-
'%s@%s' % (opts.user, host), stringify_command('true')],
693-
stdout=devnull,
694-
stderr=devnull
695-
)
696-
return ret == 0
697-
except subprocess.CalledProcessError as e:
698-
return False
705+
s = subprocess.Popen(
706+
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
707+
'%s@%s' % (opts.user, host), stringify_command('true')],
708+
stdout=subprocess.PIPE,
709+
stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
710+
)
711+
cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout
712+
713+
if s.returncode != 0 and print_ssh_output:
714+
# extra leading newline is for spacing in wait_for_cluster_state()
715+
print textwrap.dedent("""\n
716+
Warning: SSH connection error. (This could be temporary.)
717+
Host: {h}
718+
SSH return code: {r}
719+
SSH output: {o}
720+
""").format(
721+
h=host,
722+
r=s.returncode,
723+
o=cmd_output.strip()
724+
)
725+
726+
return s.returncode == 0
699727

700728

701729
def is_cluster_ssh_available(cluster_instances, opts):
@@ -1026,6 +1054,17 @@ def real_main():
10261054
print >> stderr, "ebs-vol-num cannot be greater than 8"
10271055
sys.exit(1)
10281056

1057+
# Prevent breaking ami_prefix (/, .git and startswith checks)
1058+
# Prevent forks with non spark-ec2 names for now.
1059+
if opts.spark_ec2_git_repo.endswith("/") or \
1060+
opts.spark_ec2_git_repo.endswith(".git") or \
1061+
not opts.spark_ec2_git_repo.startswith("https://github.com") or \
1062+
not opts.spark_ec2_git_repo.endswith("spark-ec2"):
1063+
print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \
1064+
"trailing / or .git. " \
1065+
"Furthermore, we currently only support forks named spark-ec2."
1066+
sys.exit(1)
1067+
10291068
try:
10301069
conn = ec2.connect_to_region(opts.region)
10311070
except Exception as e:

0 commit comments

Comments
 (0)