Skip to content

Commit fe0ee92

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-3530
2 parents 6e86d98 + 48a19a6 commit fe0ee92

File tree

45 files changed

+1256
-155
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1256
-155
lines changed

core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ $(function() {
3939
var column = "table ." + $(this).attr("name");
4040
$(column).hide();
4141
});
42+
// Stripe table rows after rows have been hidden to ensure correct striping.
43+
stripeTables();
4244

4345
$("input:checkbox").click(function() {
4446
var column = "table ." + $(this).attr("name");

core/src/main/resources/org/apache/spark/ui/static/table.js

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,3 @@ function stripeTables() {
2828
});
2929
});
3030
}
31-
32-
/* Stripe all tables after pages finish loading. */
33-
$(function() {
34-
stripeTables();
35-
});

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -343,15 +343,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
343343
*/
344344
def getSecretKey(): String = secretKey
345345

346-
override def getSaslUser(appId: String): String = {
347-
val myAppId = sparkConf.getAppId
348-
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
349-
getSaslUser()
350-
}
351-
352-
override def getSecretKey(appId: String): String = {
353-
val myAppId = sparkConf.getAppId
354-
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
355-
getSecretKey()
356-
}
346+
// Default SecurityManager only has a single secret key, so ignore appId.
347+
override def getSaslUser(appId: String): String = getSaslUser()
348+
override def getSecretKey(appId: String): String = getSecretKey()
357349
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
560560

561561

562562
/**
563+
* :: Experimental ::
564+
*
563565
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
564566
* (useful for binary data)
565567
*
@@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
602604
}
603605

604606
/**
607+
* :: Experimental ::
608+
*
605609
* Load data from a flat binary file, assuming the length of each record is constant.
606610
*
607611
* @param path Directory to the input data files

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,14 @@ import java.io.Closeable
2121
import java.util
2222
import java.util.{Map => JMap}
2323

24-
import java.io.DataInputStream
25-
26-
import org.apache.hadoop.io.{BytesWritable, LongWritable}
27-
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
28-
2924
import scala.collection.JavaConversions
3025
import scala.collection.JavaConversions._
3126
import scala.language.implicitConversions
3227
import scala.reflect.ClassTag
3328

3429
import com.google.common.base.Optional
3530
import org.apache.hadoop.conf.Configuration
31+
import org.apache.spark.input.PortableDataStream
3632
import org.apache.hadoop.mapred.{InputFormat, JobConf}
3733
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3834

@@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
286282
new JavaPairRDD(sc.binaryFiles(path, minPartitions))
287283

288284
/**
285+
* :: Experimental ::
286+
*
289287
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
290288
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
291289
* record and returned in a key-value pair, where the key is the path of each file,
@@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
312310
*
313311
* @note Small files are preferred; very large files but may cause bad performance.
314312
*/
313+
@Experimental
315314
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
316315
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
317316

318317
/**
318+
* :: Experimental ::
319+
*
319320
* Load data from a flat binary file, assuming the length of each record is constant.
320321
*
321322
* @param path Directory to the input data files
322323
* @return An RDD of data with values, represented as byte arrays
323324
*/
325+
@Experimental
324326
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
325327
new JavaRDD(sc.binaryRecords(path, recordLength))
326328
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io._
2121
import java.net._
2222
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
2323

24+
import org.apache.spark.input.PortableDataStream
25+
2426
import scala.collection.JavaConversions._
2527
import scala.collection.mutable
2628
import scala.language.existentials
@@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
395397
newIter.asInstanceOf[Iterator[String]].foreach { str =>
396398
writeUTF(str, dataOut)
397399
}
398-
case pair: Tuple2[_, _] =>
399-
pair._1 match {
400-
case bytePair: Array[Byte] =>
401-
newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
402-
dataOut.writeInt(pair._1.length)
403-
dataOut.write(pair._1)
404-
dataOut.writeInt(pair._2.length)
405-
dataOut.write(pair._2)
406-
}
407-
case stringPair: String =>
408-
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
409-
writeUTF(pair._1, dataOut)
410-
writeUTF(pair._2, dataOut)
411-
}
412-
case other =>
413-
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
400+
case stream: PortableDataStream =>
401+
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
402+
val bytes = stream.toArray()
403+
dataOut.writeInt(bytes.length)
404+
dataOut.write(bytes)
405+
}
406+
case (key: String, stream: PortableDataStream) =>
407+
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
408+
case (key, stream) =>
409+
writeUTF(key, dataOut)
410+
val bytes = stream.toArray()
411+
dataOut.writeInt(bytes.length)
412+
dataOut.write(bytes)
413+
}
414+
case (key: String, value: String) =>
415+
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
416+
case (key, value) =>
417+
writeUTF(key, dataOut)
418+
writeUTF(value, dataOut)
419+
}
420+
case (key: Array[Byte], value: Array[Byte]) =>
421+
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
422+
case (key, value) =>
423+
dataOut.writeInt(key.length)
424+
dataOut.write(key)
425+
dataOut.writeInt(value.length)
426+
dataOut.write(value)
414427
}
415428
case other =>
416429
throw new SparkException("Unexpected element type " + first.getClass)
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.worker
19+
20+
import org.apache.spark.{Logging, SparkConf, SecurityManager}
21+
import org.apache.spark.network.TransportContext
22+
import org.apache.spark.network.netty.SparkTransportConf
23+
import org.apache.spark.network.sasl.SaslRpcHandler
24+
import org.apache.spark.network.server.TransportServer
25+
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
26+
27+
/**
28+
* Provides a server from which Executors can read shuffle files (rather than reading directly from
29+
* each other), to provide uninterrupted access to the files in the face of executors being turned
30+
* off or killed.
31+
*
32+
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
33+
*/
34+
private[worker]
35+
class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
36+
extends Logging {
37+
38+
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
39+
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
40+
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
41+
42+
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
43+
private val blockHandler = new ExternalShuffleBlockHandler()
44+
private val transportContext: TransportContext = {
45+
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
46+
new TransportContext(transportConf, handler)
47+
}
48+
49+
private var server: TransportServer = _
50+
51+
/** Starts the external shuffle service if the user has configured us to. */
52+
def startIfEnabled() {
53+
if (enabled) {
54+
require(server == null, "Shuffle server already started")
55+
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
56+
server = transportContext.createServer(port)
57+
}
58+
}
59+
60+
def stop() {
61+
if (enabled && server != null) {
62+
server.close()
63+
server = null
64+
}
65+
}
66+
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ private[spark] class Worker(
111111
val drivers = new HashMap[String, DriverRunner]
112112
val finishedDrivers = new HashMap[String, DriverRunner]
113113

114+
// The shuffle service is not actually started unless configured.
115+
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
116+
114117
val publicAddress = {
115118
val envVar = System.getenv("SPARK_PUBLIC_DNS")
116119
if (envVar != null) envVar else host
@@ -154,6 +157,7 @@ private[spark] class Worker(
154157
logInfo("Spark home: " + sparkHome)
155158
createWorkDir()
156159
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
160+
shuffleService.startIfEnabled()
157161
webUi = new WorkerWebUI(this, workDir, webUiPort)
158162
webUi.bind()
159163
registerWithMaster()
@@ -419,6 +423,7 @@ private[spark] class Worker(
419423
registrationRetryTimer.foreach(_.cancel())
420424
executors.values.foreach(_.kill())
421425
drivers.values.foreach(_.kill())
426+
shuffleService.stop()
422427
webUi.stop()
423428
metricsSystem.stop()
424429
}
@@ -441,7 +446,8 @@ private[spark] object Worker extends Logging {
441446
cores: Int,
442447
memory: Int,
443448
masterUrls: Array[String],
444-
workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
449+
workDir: String,
450+
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
445451

446452
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
447453
val conf = new SparkConf

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.network.client.{TransportClientBootstrap, RpcResponseCal
2727
import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
2828
import org.apache.spark.network.sasl.{SaslRpcHandler, SaslClientBootstrap}
2929
import org.apache.spark.network.server._
30-
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher}
30+
import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher}
3131
import org.apache.spark.serializer.JavaSerializer
3232
import org.apache.spark.storage.{BlockId, StorageLevel}
3333
import org.apache.spark.util.Utils
@@ -71,9 +71,22 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
7171
listener: BlockFetchingListener): Unit = {
7272
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
7373
try {
74-
val client = clientFactory.createClient(host, port)
75-
new OneForOneBlockFetcher(client, blockIds.toArray, listener)
76-
.start(OpenBlocks(blockIds.map(BlockId.apply)))
74+
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
75+
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
76+
val client = clientFactory.createClient(host, port)
77+
new OneForOneBlockFetcher(client, blockIds.toArray, listener)
78+
.start(OpenBlocks(blockIds.map(BlockId.apply)))
79+
}
80+
}
81+
82+
val maxRetries = transportConf.maxIORetries()
83+
if (maxRetries > 0) {
84+
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
85+
// a bug in this code. We should remove the if statement once we're sure of the stability.
86+
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
87+
} else {
88+
blockFetchStarter.createAndStart(blockIds, listener)
89+
}
7790
} catch {
7891
case e: Exception =>
7992
logError("Exception while beginning fetchBlocks", e)

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ final class ShuffleBlockFetcherIterator(
9292
* Current [[FetchResult]] being processed. We track this so we can release the current buffer
9393
* in case of a runtime exception when processing the current buffer.
9494
*/
95-
private[this] var currentResult: FetchResult = null
95+
@volatile private[this] var currentResult: FetchResult = null
9696

9797
/**
9898
* Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that

0 commit comments

Comments
 (0)