Skip to content

Commit cedcc6f

Browse files
committed
2 parents adf4924 + d61f2c1 commit cedcc6f

File tree

48 files changed

+343
-128
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+343
-128
lines changed

bagel/src/test/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
2121
log4j.appender.file.append=false
2222
log4j.appender.file.file=target/unit-tests.log
2323
log4j.appender.file.layout=org.apache.log4j.PatternLayout
24-
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
24+
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
2525

2626
# Ignore messages below warning level from Jetty, because it's a bit verbose
2727
log4j.logger.org.eclipse.jetty=WARN

bin/spark-sql

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
set -o posix
2525

2626
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
27-
CLASS_NOT_FOUND_EXIT_STATUS=101
2827

2928
# Figure out where Spark is installed
3029
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
@@ -53,13 +52,4 @@ source "$FWDIR"/bin/utils.sh
5352
SUBMIT_USAGE_FUNCTION=usage
5453
gatherSparkSubmitOpts "$@"
5554

56-
"$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
57-
exit_status=$?
58-
59-
if [[ exit_status -eq CLASS_NOT_FOUND_EXIT_STATUS ]]; then
60-
echo
61-
echo "Failed to load Spark SQL CLI main class $CLASS."
62-
echo "You need to build Spark with -Phive."
63-
fi
64-
65-
exit $exit_status
55+
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"

core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,17 @@
322322
</tasks>
323323
</configuration>
324324
</plugin>
325+
<plugin>
326+
<artifactId>maven-clean-plugin</artifactId>
327+
<configuration>
328+
<filesets>
329+
<fileset>
330+
<directory>${basedir}/../python/build</directory>
331+
</fileset>
332+
</filesets>
333+
<verbose>true</verbose>
334+
</configuration>
335+
</plugin>
325336
<plugin>
326337
<groupId>org.apache.maven.plugins</groupId>
327338
<artifactId>maven-shade-plugin</artifactId>

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -339,26 +339,34 @@ private[spark] object PythonRDD extends Logging {
339339
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
340340
JavaRDD[Array[Byte]] = {
341341
val file = new DataInputStream(new FileInputStream(filename))
342-
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
343342
try {
344-
while (true) {
345-
val length = file.readInt()
346-
val obj = new Array[Byte](length)
347-
file.readFully(obj)
348-
objs.append(obj)
343+
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
344+
try {
345+
while (true) {
346+
val length = file.readInt()
347+
val obj = new Array[Byte](length)
348+
file.readFully(obj)
349+
objs.append(obj)
350+
}
351+
} catch {
352+
case eof: EOFException => {}
349353
}
350-
} catch {
351-
case eof: EOFException => {}
354+
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
355+
} finally {
356+
file.close()
352357
}
353-
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
354358
}
355359

356360
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
357361
val file = new DataInputStream(new FileInputStream(filename))
358-
val length = file.readInt()
359-
val obj = new Array[Byte](length)
360-
file.readFully(obj)
361-
sc.broadcast(obj)
362+
try {
363+
val length = file.readInt()
364+
val obj = new Array[Byte](length)
365+
file.readFully(obj)
366+
sc.broadcast(obj)
367+
} finally {
368+
file.close()
369+
}
362370
}
363371

364372
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ object SparkSubmit {
320320
} catch {
321321
case e: ClassNotFoundException =>
322322
e.printStackTrace(printStream)
323+
if (childMainClass.contains("thriftserver")) {
324+
println(s"Failed to load main class $childMainClass.")
325+
println("You need to build Spark with -Phive.")
326+
}
323327
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
324328
}
325329

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.util.control.NonFatal
23+
2224
import org.apache.spark._
2325
import org.apache.spark.TaskState.TaskState
2426
import org.apache.spark.serializer.SerializerInstance
@@ -32,7 +34,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
3234

3335
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
3436
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
35-
THREADS, "Result resolver thread")
37+
THREADS, "task-result-getter")
3638

3739
protected val serializer = new ThreadLocal[SerializerInstance] {
3840
override def initialValue(): SerializerInstance = {
@@ -70,7 +72,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
7072
case cnf: ClassNotFoundException =>
7173
val loader = Thread.currentThread.getContextClassLoader
7274
taskSetManager.abort("ClassNotFound with classloader: " + loader)
73-
case ex: Exception =>
75+
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
76+
case NonFatal(ex) =>
7477
logError("Exception while getting task result", ex)
7578
taskSetManager.abort("Exception while getting task result: %s".format(ex))
7679
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer
2323
import java.util.{Properties, Locale, Random, UUID}
2424
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
2525

26+
import org.eclipse.jetty.util.MultiException
27+
2628
import scala.collection.JavaConversions._
2729
import scala.collection.Map
2830
import scala.collection.mutable.ArrayBuffer
@@ -1437,7 +1439,7 @@ private[spark] object Utils extends Logging {
14371439
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
14381440
for (offset <- 0 to maxRetries) {
14391441
// Do not increment port if startPort is 0, which is treated as a special port
1440-
val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536
1442+
val tryPort = if (startPort == 0) startPort else (startPort + offset) % (65536 - 1024) + 1024
14411443
try {
14421444
val (service, port) = startService(tryPort)
14431445
logInfo(s"Successfully started service$serviceString on port $port.")
@@ -1470,6 +1472,7 @@ private[spark] object Utils extends Logging {
14701472
return true
14711473
}
14721474
isBindCollision(e.getCause)
1475+
case e: MultiException => e.getThrowables.exists(isBindCollision)
14731476
case e: Exception => isBindCollision(e.getCause)
14741477
case _ => false
14751478
}

core/src/test/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
2121
log4j.appender.file.append=false
2222
log4j.appender.file.file=target/unit-tests.log
2323
log4j.appender.file.layout=org.apache.log4j.PatternLayout
24-
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
24+
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
2525

2626
# Ignore messages below warning level from Jetty, because it's a bit verbose
2727
log4j.logger.org.eclipse.jetty=WARN

examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
* Usage: JavaSparkPi [slices]
3232
*/
3333
public final class JavaSparkPi {
34-
3534

3635
public static void main(String[] args) throws Exception {
3736
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
@@ -61,5 +60,7 @@ public Integer call(Integer integer, Integer integer2) {
6160
});
6261

6362
System.out.println("Pi is roughly " + 4.0 * count / n);
63+
64+
jsc.stop();
6465
}
6566
}

examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public static void main(String[] args) throws Exception {
6161
// Load a text file and convert each line to a Java Bean.
6262
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
6363
new Function<String, Person>() {
64-
public Person call(String line) throws Exception {
64+
@Override
65+
public Person call(String line) {
6566
String[] parts = line.split(",");
6667

6768
Person person = new Person();
@@ -82,6 +83,7 @@ public Person call(String line) throws Exception {
8283
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
8384
// The columns of a row in the result can be accessed by ordinal.
8485
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
86+
@Override
8587
public String call(Row row) {
8688
return "Name: " + row.getString(0);
8789
}
@@ -104,6 +106,7 @@ public String call(Row row) {
104106
JavaSchemaRDD teenagers2 =
105107
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
106108
teenagerNames = teenagers2.map(new Function<Row, String>() {
109+
@Override
107110
public String call(Row row) {
108111
return "Name: " + row.getString(0);
109112
}
@@ -136,6 +139,7 @@ public String call(Row row) {
136139
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
137140
// The columns of a row in the result can be accessed by ordinal.
138141
teenagerNames = teenagers3.map(new Function<Row, String>() {
142+
@Override
139143
public String call(Row row) { return "Name: " + row.getString(0); }
140144
}).collect();
141145
for (String name: teenagerNames) {
@@ -162,12 +166,15 @@ public String call(Row row) {
162166

163167
JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
164168
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
169+
@Override
165170
public String call(Row row) {
166171
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
167172
}
168173
}).collect();
169174
for (String name: nameAndCity) {
170175
System.out.println(name);
171176
}
177+
178+
ctx.stop();
172179
}
173180
}

0 commit comments

Comments
 (0)