Skip to content

Commit 711d1c6

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-7752
2 parents 40ae53e + a70bf06 commit 711d1c6

File tree

58 files changed

+1522
-291
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1522
-291
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11591159
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
11601160
withScope {
11611161
assertNotStopped()
1162-
val kc = kcf()
1163-
val vc = vcf()
1162+
val kc = clean(kcf)()
1163+
val vc = clean(vcf)()
11641164
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
11651165
val writables = hadoopFile(path, format,
11661166
kc.writableClass(km).asInstanceOf[Class[Writable]],

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2525

2626
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2727
import org.apache.spark.deploy.SparkHadoopUtil
28-
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
28+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
29+
UIRoot}
2930
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
3031
import org.apache.spark.ui.JettyUtils._
3132
import org.apache.spark.util.{SignalLogger, Utils}
@@ -125,7 +126,7 @@ class HistoryServer(
125126
def initialize() {
126127
attachPage(new HistoryPage(this))
127128

128-
attachHandler(JsonRootResource.getJsonServlet(this))
129+
attachHandler(ApiRootResource.getServletHandler(this))
129130

130131
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
131132

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.deploy.master.ui
1919

2020
import org.apache.spark.Logging
2121
import org.apache.spark.deploy.master.Master
22-
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
22+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationsListResource, ApplicationInfo,
23+
UIRoot}
2324
import org.apache.spark.ui.{SparkUI, WebUI}
2425
import org.apache.spark.ui.JettyUtils._
2526
import org.apache.spark.util.RpcUtils
@@ -47,7 +48,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
4748
attachPage(new HistoryNotFoundPage(this))
4849
attachPage(masterPage)
4950
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
50-
attachHandler(JsonRootResource.getJsonServlet(this))
51+
attachHandler(ApiRootResource.getServletHandler(this))
5152
attachHandler(createRedirectHandler(
5253
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
5354
attachHandler(createRedirectHandler(

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
296296
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
297297
*/
298298
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
299+
val cleanedF = self.sparkContext.clean(func)
299300

300301
if (keyClass.isArray) {
301302
throw new SparkException("reduceByKeyLocally() does not support array keys")
@@ -305,15 +306,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
305306
val map = new JHashMap[K, V]
306307
iter.foreach { pair =>
307308
val old = map.get(pair._1)
308-
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
309+
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
309310
}
310311
Iterator(map)
311312
} : Iterator[JHashMap[K, V]]
312313

313314
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
314315
m2.foreach { pair =>
315316
val old = m1.get(pair._1)
316-
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
317+
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
317318
}
318319
m1
319320
} : JHashMap[K, V]

core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.spark.Logging
2929

30-
private[serializer] object SerializationDebugger extends Logging {
30+
private[spark] object SerializationDebugger extends Logging {
3131

3232
/**
3333
* Improve the given NotSerializableException with the serialization path leading from the given

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala renamed to core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI
3939
* HistoryServerSuite.
4040
*/
4141
@Path("/v1")
42-
private[v1] class JsonRootResource extends UIRootFromServletContext {
42+
private[v1] class ApiRootResource extends UIRootFromServletContext {
4343

4444
@Path("applications")
4545
def getApplicationList(): ApplicationListResource = {
@@ -166,11 +166,11 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
166166

167167
}
168168

169-
private[spark] object JsonRootResource {
169+
private[spark] object ApiRootResource {
170170

171-
def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
171+
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
172172
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
173-
jerseyContext.setContextPath("/json")
173+
jerseyContext.setContextPath("/api")
174174
val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
175175
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
176176
"com.sun.jersey.api.core.PackagesResourceConfig")

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.ui
1919

2020
import java.util.Date
2121

22-
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot}
22+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
23+
UIRoot}
2324
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
2425
import org.apache.spark.scheduler._
2526
import org.apache.spark.storage.StorageStatusListener
@@ -64,7 +65,7 @@ private[spark] class SparkUI private (
6465
attachTab(new ExecutorsTab(this))
6566
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
6667
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
67-
attachHandler(JsonRootResource.getJsonServlet(this))
68+
attachHandler(ApiRootResource.getServletHandler(this))
6869
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
6970
attachHandler(createRedirectHandler(
7071
"/stages/stage/kill", "/stages", stagesTab.handleKillRequest,

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import scala.reflect.ClassTag;
2727
import scala.runtime.AbstractFunction1;
2828

29+
import com.google.common.collect.Iterators;
2930
import com.google.common.collect.HashMultiset;
3031
import com.google.common.io.ByteStreams;
3132
import org.junit.After;
@@ -252,6 +253,20 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
252253
createWriter(false).stop(false);
253254
}
254255

256+
@Test
257+
public void writeEmptyIterator() throws Exception {
258+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
259+
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
260+
final Option<MapStatus> mapStatus = writer.stop(true);
261+
assertTrue(mapStatus.isDefined());
262+
assertTrue(mergedOutputFile.exists());
263+
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
264+
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
265+
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
266+
assertEquals(0, taskMetrics.diskBytesSpilled());
267+
assertEquals(0, taskMetrics.memoryBytesSpilled());
268+
}
269+
255270
@Test
256271
public void writeWithoutSpilling() throws Exception {
257272
// In this example, each partition should have exactly one record:

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,11 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
198198
}
199199

200200
def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
201-
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/json/v1/$path"))
201+
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
202202
}
203203

204204
def getUrl(path: String): String = {
205-
HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/json/v1/$path"))
205+
HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
206206
}
207207

208208
def generateExpectation(name: String, path: String): Unit = {

core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
497497
goToUi(sc, "/jobs/job/?id=7")
498498
find("no-info").get.text should be ("No information to display for job 7")
499499

500-
val badJob = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get, "jobs/7"))
500+
val badJob = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "jobs/7"))
501501
badJob._1 should be (HttpServletResponse.SC_NOT_FOUND)
502502
badJob._2 should be (None)
503503
badJob._3 should be (Some("unknown job: 7"))
@@ -540,18 +540,18 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
540540

541541
goToUi(sc, "/stages/stage/?id=12&attempt=0")
542542
find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)")
543-
val badStage = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/12/0"))
543+
val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/12/0"))
544544
badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
545545
badStage._2 should be (None)
546546
badStage._3 should be (Some("unknown stage: 12"))
547547

548-
val badAttempt = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/19/15"))
548+
val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/19/15"))
549549
badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
550550
badAttempt._2 should be (None)
551551
badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]"))
552552

553553
val badStageAttemptList = HistoryServerSuite.getContentAndCode(
554-
jsonUrl(sc.ui.get, "stages/12"))
554+
apiUrl(sc.ui.get, "stages/12"))
555555
badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND)
556556
badStageAttemptList._2 should be (None)
557557
badStageAttemptList._3 should be (Some("unknown stage: 12"))
@@ -561,7 +561,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
561561
test("live UI json application list") {
562562
withSpark(newSparkContext()) { sc =>
563563
val appListRawJson = HistoryServerSuite.getUrl(new URL(
564-
sc.ui.get.appUIAddress + "/json/v1/applications"))
564+
sc.ui.get.appUIAddress + "/api/v1/applications"))
565565
val appListJsonAst = JsonMethods.parse(appListRawJson)
566566
appListJsonAst.children.length should be (1)
567567
val attempts = (appListJsonAst \ "attempts").children
@@ -587,10 +587,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
587587
}
588588

589589
def getJson(ui: SparkUI, path: String): JValue = {
590-
JsonMethods.parse(HistoryServerSuite.getUrl(jsonUrl(ui, path)))
590+
JsonMethods.parse(HistoryServerSuite.getUrl(apiUrl(ui, path)))
591591
}
592592

593-
def jsonUrl(ui: SparkUI, path: String): URL = {
594-
new URL(ui.appUIAddress + "/json/v1/applications/test/" + path)
593+
def apiUrl(ui: SparkUI, path: String): URL = {
594+
new URL(ui.appUIAddress + "/api/v1/applications/test/" + path)
595595
}
596596
}

0 commit comments

Comments
 (0)