Skip to content

Commit 8fb02e5

Browse files
committed
merged master
2 parents 560d13b + 25c9b9f commit 8fb02e5

File tree

40 files changed

+635
-455
lines changed

40 files changed

+635
-455
lines changed

build/mvn

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
2222
# Preserve the calling directory
2323
_CALLING_DIR="$(pwd)"
24+
# Options used during compilation
25+
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
2426

2527
# Installs any application tarball given a URL, the expected tarball name,
2628
# and, optionally, a checkable binary path to determine if the binary has
@@ -136,14 +138,15 @@ cd "${_CALLING_DIR}"
136138
# Now that zinc is ensured to be installed, check its status and, if its
137139
# not running or just installed, start it
138140
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
141+
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
139142
${ZINC_BIN} -shutdown
140143
${ZINC_BIN} -start -port ${ZINC_PORT} \
141144
-scala-compiler "${SCALA_COMPILER}" \
142145
-scala-library "${SCALA_LIBRARY}" &>/dev/null
143146
fi
144147

145148
# Set any `mvn` options if not already present
146-
export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"}
149+
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
147150

148151
# Last, call the `mvn` command as usual
149152
${MVN_BIN} "$@"

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -144,24 +144,11 @@ private[spark] class PythonRDD(
144144
stream.readFully(update)
145145
accumulator += Collections.singletonList(update)
146146
}
147-
148147
// Check whether the worker is ready to be re-used.
149-
if (reuse_worker) {
150-
// It has a high possibility that the ending mark is already available,
151-
// And current task should not be blocked by checking it
152-
153-
if (stream.available() >= 4) {
154-
val ending = stream.readInt()
155-
if (ending == SpecialLengths.END_OF_STREAM) {
156-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
157-
released = true
158-
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
159-
} else {
160-
logInfo(s"Communication with worker did not end cleanly " +
161-
s"(ending with $ending), close it: $worker")
162-
}
163-
} else {
164-
logInfo(s"The ending mark from worker is not available, close it: $worker")
148+
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
149+
if (reuse_worker) {
150+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
151+
released = true
165152
}
166153
}
167154
null

mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ class LogisticRegressionWithLBFGS
355355
}
356356

357357
override protected def createModel(weights: Vector, intercept: Double) = {
358-
new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1)
358+
if (numOfLinearPredictor == 1) {
359+
new LogisticRegressionModel(weights, intercept)
360+
} else {
361+
new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1)
362+
}
359363
}
360364
}

mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
126126
/**
127127
* The dimension of training features.
128128
*/
129-
protected var numFeatures: Int = 0
129+
protected var numFeatures: Int = -1
130130

131131
/**
132132
* Set if the algorithm should use feature scaling to improve the convergence during optimization.
@@ -163,7 +163,9 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
163163
* RDD of LabeledPoint entries.
164164
*/
165165
def run(input: RDD[LabeledPoint]): M = {
166-
numFeatures = input.first().features.size
166+
if (numFeatures < 0) {
167+
numFeatures = input.map(_.features.size).first()
168+
}
167169

168170
/**
169171
* When `numOfLinearPredictor > 1`, the intercepts are encapsulated into weights,
@@ -193,7 +195,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
193195
* of LabeledPoint entries starting from the initial weights provided.
194196
*/
195197
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
196-
numFeatures = input.first().features.size
197198

198199
if (input.getStorageLevel == StorageLevel.NONE) {
199200
logWarning("The input data is not directly cached, which may hurt performance if its"

project/SparkBuild.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,16 @@ object Unidoc {
361361
publish := {},
362362

363363
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
364-
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn),
364+
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn),
365365
unidocProjectFilter in(JavaUnidoc, unidoc) :=
366-
inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn),
366+
inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn),
367+
368+
// Skip actual catalyst, but include the subproject.
369+
// Catalyst is not public API and contains quasiquotes which break scaladoc.
370+
unidocAllSources in (ScalaUnidoc, unidoc) := {
371+
(unidocAllSources in (ScalaUnidoc, unidoc)).value
372+
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
373+
},
367374

368375
// Skip class names containing $ and some internal packages in Javadocs
369376
unidocAllSources in (JavaUnidoc, unidoc) := {
@@ -376,6 +383,7 @@ object Unidoc {
376383
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
377384
.map(_.filterNot(_.getCanonicalPath.contains("python")))
378385
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
386+
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
379387
},
380388

381389
// Javadoc options: create a window title, and group key packages on index page

python/pyspark/context.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class SparkContext(object):
6464
_lock = Lock()
6565
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
6666

67+
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
68+
6769
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6870
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
6971
gateway=None, jsc=None, profiler_cls=BasicProfiler):
@@ -185,8 +187,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
185187
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
186188
if path != "":
187189
(dirname, filename) = os.path.split(path)
188-
if filename.lower().endswith("zip") or filename.lower().endswith("egg") \
189-
or filename.lower().endswith("jar"):
190+
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
190191
self._python_includes.append(filename)
191192
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
192193

@@ -706,7 +707,7 @@ def addPyFile(self, path):
706707
self.addFile(path)
707708
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
708709

709-
if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
710+
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
710711
self._python_includes.append(filename)
711712
# for tests in local mode
712713
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))

python/pyspark/sql/context.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def applySchema(self, rdd, schema):
252252
>>> schema = StructType([StructField("field1", IntegerType(), False),
253253
... StructField("field2", StringType(), False)])
254254
>>> df = sqlCtx.applySchema(rdd2, schema)
255-
>>> sqlCtx.registerRDDAsTable(df, "table1")
255+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
256256
>>> df2 = sqlCtx.sql("SELECT * from table1")
257257
>>> df2.collect()
258258
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
@@ -405,17 +405,17 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
405405

406406
return self.applySchema(data, schema)
407407

408-
def registerRDDAsTable(self, rdd, tableName):
408+
def registerDataFrameAsTable(self, rdd, tableName):
409409
"""Registers the given RDD as a temporary table in the catalog.
410410
411411
Temporary tables exist only during the lifetime of this instance of
412412
SQLContext.
413413
414-
>>> sqlCtx.registerRDDAsTable(df, "table1")
414+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
415415
"""
416416
if (rdd.__class__ is DataFrame):
417417
df = rdd._jdf
418-
self._ssql_ctx.registerRDDAsTable(df, tableName)
418+
self._ssql_ctx.registerDataFrameAsTable(df, tableName)
419419
else:
420420
raise ValueError("Can only register DataFrame as table")
421421

@@ -456,7 +456,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
456456
... print>>ofn, json
457457
>>> ofn.close()
458458
>>> df1 = sqlCtx.jsonFile(jsonFile)
459-
>>> sqlCtx.registerRDDAsTable(df1, "table1")
459+
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
460460
>>> df2 = sqlCtx.sql(
461461
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
462462
... "field6 as f4 from table1")
@@ -467,7 +467,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
467467
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
468468
469469
>>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
470-
>>> sqlCtx.registerRDDAsTable(df3, "table2")
470+
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
471471
>>> df4 = sqlCtx.sql(
472472
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
473473
... "field6 as f4 from table2")
@@ -485,7 +485,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
485485
... StructField("field5",
486486
... ArrayType(IntegerType(), False), True)]), False)])
487487
>>> df5 = sqlCtx.jsonFile(jsonFile, schema)
488-
>>> sqlCtx.registerRDDAsTable(df5, "table3")
488+
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
489489
>>> df6 = sqlCtx.sql(
490490
... "SELECT field2 AS f1, field3.field5 as f2, "
491491
... "field3.field5[0] as f3 from table3")
@@ -509,7 +509,7 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
509509
determine the schema.
510510
511511
>>> df1 = sqlCtx.jsonRDD(json)
512-
>>> sqlCtx.registerRDDAsTable(df1, "table1")
512+
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
513513
>>> df2 = sqlCtx.sql(
514514
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
515515
... "field6 as f4 from table1")
@@ -520,7 +520,7 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
520520
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
521521
522522
>>> df3 = sqlCtx.jsonRDD(json, df1.schema)
523-
>>> sqlCtx.registerRDDAsTable(df3, "table2")
523+
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
524524
>>> df4 = sqlCtx.sql(
525525
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
526526
... "field6 as f4 from table2")
@@ -538,7 +538,7 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
538538
... StructField("field5",
539539
... ArrayType(IntegerType(), False), True)]), False)])
540540
>>> df5 = sqlCtx.jsonRDD(json, schema)
541-
>>> sqlCtx.registerRDDAsTable(df5, "table3")
541+
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
542542
>>> df6 = sqlCtx.sql(
543543
... "SELECT field2 AS f1, field3.field5 as f2, "
544544
... "field3.field5[0] as f3 from table3")
@@ -628,7 +628,7 @@ def createExternalTable(self, tableName, path=None, source=None,
628628
def sql(self, sqlQuery):
629629
"""Return a L{DataFrame} representing the result of the given query.
630630
631-
>>> sqlCtx.registerRDDAsTable(df, "table1")
631+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
632632
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
633633
>>> df2.collect()
634634
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
@@ -638,7 +638,7 @@ def sql(self, sqlQuery):
638638
def table(self, tableName):
639639
"""Returns the specified table as a L{DataFrame}.
640640
641-
>>> sqlCtx.registerRDDAsTable(df, "table1")
641+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
642642
>>> df2 = sqlCtx.table("table1")
643643
>>> sorted(df.collect()) == sorted(df2.collect())
644644
True
@@ -653,7 +653,7 @@ def tables(self, dbName=None):
653653
The returned DataFrame has two columns, tableName and isTemporary
654654
(a column with BooleanType indicating if a table is a temporary one or not).
655655
656-
>>> sqlCtx.registerRDDAsTable(df, "table1")
656+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
657657
>>> df2 = sqlCtx.tables()
658658
>>> df2.filter("tableName = 'table1'").first()
659659
Row(tableName=u'table1', isTemporary=True)
@@ -668,7 +668,7 @@ def tableNames(self, dbName=None):
668668
669669
If `dbName` is not specified, the current database will be used.
670670
671-
>>> sqlCtx.registerRDDAsTable(df, "table1")
671+
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
672672
>>> "table1" in sqlCtx.tableNames()
673673
True
674674
>>> "table1" in sqlCtx.tableNames("db")

python/pyspark/sql/dataframe.py

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,22 @@ def printSchema(self):
238238
"""
239239
print (self._jdf.schema().treeString())
240240

241+
def explain(self, extended=False):
242+
"""
243+
Prints the plans (logical and physical) to the console for
244+
debugging purpose.
245+
246+
If extended is False, only prints the physical plan.
247+
"""
248+
self._jdf.explain(extended)
249+
250+
def isLocal(self):
251+
"""
252+
Returns True if the `collect` and `take` methods can be run locally
253+
(without any Spark executors).
254+
"""
255+
return self._jdf.isLocal()
256+
241257
def show(self):
242258
"""
243259
Print the first 20 rows.
@@ -247,14 +263,12 @@ def show(self):
247263
2 Alice
248264
5 Bob
249265
>>> df
250-
age name
251-
2 Alice
252-
5 Bob
266+
DataFrame[age: int, name: string]
253267
"""
254-
print (self)
268+
print self._jdf.showString().encode('utf8', 'ignore')
255269

256270
def __repr__(self):
257-
return self._jdf.showString()
271+
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
258272

259273
def count(self):
260274
"""Return the number of elements in this RDD.
@@ -336,13 +350,40 @@ def mapPartitions(self, f, preservesPartitioning=False):
336350
"""
337351
Return a new RDD by applying a function to each partition.
338352
353+
It's a shorthand for df.rdd.mapPartitions()
354+
339355
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
340356
>>> def f(iterator): yield 1
341357
>>> rdd.mapPartitions(f).sum()
342358
4
343359
"""
344360
return self.rdd.mapPartitions(f, preservesPartitioning)
345361

362+
def foreach(self, f):
363+
"""
364+
Applies a function to all rows of this DataFrame.
365+
366+
It's a shorthand for df.rdd.foreach()
367+
368+
>>> def f(person):
369+
... print person.name
370+
>>> df.foreach(f)
371+
"""
372+
return self.rdd.foreach(f)
373+
374+
def foreachPartition(self, f):
375+
"""
376+
Applies a function to each partition of this DataFrame.
377+
378+
It's a shorthand for df.rdd.foreachPartition()
379+
380+
>>> def f(people):
381+
... for person in people:
382+
... print person.name
383+
>>> df.foreachPartition(f)
384+
"""
385+
return self.rdd.foreachPartition(f)
386+
346387
def cache(self):
347388
""" Persist with the default storage level (C{MEMORY_ONLY_SER}).
348389
"""
@@ -377,8 +418,13 @@ def repartition(self, numPartitions):
377418
""" Return a new :class:`DataFrame` that has exactly `numPartitions`
378419
partitions.
379420
"""
380-
rdd = self._jdf.repartition(numPartitions, None)
381-
return DataFrame(rdd, self.sql_ctx)
421+
return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx)
422+
423+
def distinct(self):
424+
"""
425+
Return a new :class:`DataFrame` containing the distinct rows in this DataFrame.
426+
"""
427+
return DataFrame(self._jdf.distinct(), self.sql_ctx)
382428

383429
def sample(self, withReplacement, fraction, seed=None):
384430
"""
@@ -957,10 +1003,7 @@ def cast(self, dataType):
9571003
return Column(jc, self.sql_ctx)
9581004

9591005
def __repr__(self):
960-
if self._jdf.isComputable():
961-
return self._jdf.samples()
962-
else:
963-
return 'Column<%s>' % self._jdf.toString()
1006+
return 'Column<%s>' % self._jdf.toString().encode('utf8')
9641007

9651008
def toPandas(self):
9661009
"""

0 commit comments

Comments
 (0)