Skip to content

Commit dec85db

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into timeline-viewer-feature
2 parents fcdab7d + 15e0d2b commit dec85db

File tree

63 files changed

+338
-289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+338
-289
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1901,7 +1901,17 @@ object SparkContext extends Logging {
19011901

19021902
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
19031903

1904-
private[spark] val DRIVER_IDENTIFIER = "<driver>"
1904+
/**
1905+
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
1906+
* changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see
1907+
* SPARK-6716 for more details).
1908+
*/
1909+
private[spark] val DRIVER_IDENTIFIER = "driver"
1910+
1911+
/**
1912+
* Legacy version of DRIVER_IDENTIFIER, retained for backwards-compatibility.
1913+
*/
1914+
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"
19051915

19061916
// The following deprecated objects have already been copied to `object AccumulatorParam` to
19071917
// make the compiler find them automatically. They are duplicate codes only for backward

core/src/main/scala/org/apache/spark/TaskContextHelper.scala

Lines changed: 0 additions & 29 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
8989

9090
/* Find out driver status then exit the JVM */
9191
def pollAndReportStatus(driverId: String) {
92-
println(s"... waiting before polling master for driver state")
92+
println("... waiting before polling master for driver state")
9393
Thread.sleep(5000)
9494
println("... polling master for driver state")
9595
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
245245
}
246246
} else {
247247
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
248-
logError("Application submission failed" + failMessage)
248+
logError(s"Application submission failed$failMessage")
249249
}
250250
}
251251

core/src/main/scala/org/apache/spark/network/nio/Connection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
181181
buffer.get(bytes)
182182
bytes.foreach(x => print(x + " "))
183183
buffer.position(curPosition)
184-
print(" (" + bytes.size + ")")
184+
print(" (" + bytes.length + ")")
185185
}
186186

187187
def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
4545
}
4646
result
4747
},
48-
Range(0, self.partitions.size),
48+
Range(0, self.partitions.length),
4949
(index: Int, data: Long) => totalCount.addAndGet(data),
5050
totalCount.get())
5151
}
@@ -54,8 +54,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
5454
* Returns a future for retrieving all elements of this RDD.
5555
*/
5656
def collectAsync(): FutureAction[Seq[T]] = {
57-
val results = new Array[Array[T]](self.partitions.size)
58-
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
57+
val results = new Array[Array[T]](self.partitions.length)
58+
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
5959
(index, data) => results(index) = data, results.flatten.toSeq)
6060
}
6161

@@ -111,15 +111,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
111111
*/
112112
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
113113
val cleanF = self.context.clean(f)
114-
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
114+
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
115115
(index, data) => Unit, Unit)
116116
}
117117

118118
/**
119119
* Applies a function f to each partition of this RDD.
120120
*/
121121
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
122-
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
122+
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
123123
(index, data) => Unit, Unit)
124124
}
125125
}

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
3636

3737
override def getPartitions: Array[Partition] = {
3838
assertValid()
39-
(0 until blockIds.size).map(i => {
39+
(0 until blockIds.length).map(i => {
4040
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
4141
}).toArray
4242
}

core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
5353
extends RDD[Pair[T, U]](sc, Nil)
5454
with Serializable {
5555

56-
val numPartitionsInRdd2 = rdd2.partitions.size
56+
val numPartitionsInRdd2 = rdd2.partitions.length
5757

5858
override def getPartitions: Array[Partition] = {
5959
// create the cross product split
60-
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
60+
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
6161
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
6262
val idx = s1.index * numPartitionsInRdd2 + s2.index
6363
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
4949
if (fs.exists(cpath)) {
5050
val dirContents = fs.listStatus(cpath).map(_.getPath)
5151
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
52-
val numPart = partitionFiles.size
52+
val numPart = partitionFiles.length
5353
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
5454
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
5555
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
9999

100100
override def getPartitions: Array[Partition] = {
101101
val array = new Array[Partition](part.numPartitions)
102-
for (i <- 0 until array.size) {
102+
for (i <- 0 until array.length) {
103103
// Each CoGroupPartition will have a dependency per contributing RDD
104104
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
105105
// Assume each RDD contributed a single dependency, and get it
@@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
120120
val sparkConf = SparkEnv.get.conf
121121
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
122122
val split = s.asInstanceOf[CoGroupPartition]
123-
val numRdds = split.deps.size
123+
val numRdds = split.deps.length
124124

125125
// A list of (rdd iterator, dependency number) pairs
126126
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]

0 commit comments

Comments
 (0)