Skip to content

Commit aa0717d

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 5f109b4 + feb3a9d commit aa0717d

File tree

202 files changed

+7471
-1963
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

202 files changed

+7471
-1963
lines changed

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -757,12 +757,12 @@ test_that("parquetFile works with multiple input paths", {
757757
test_that("describe() on a DataFrame", {
758758
df <- jsonFile(sqlCtx, jsonPath)
759759
stats <- describe(df, "age")
760-
expect_true(collect(stats)[1, "summary"] == "count")
761-
expect_true(collect(stats)[2, "age"] == 24.5)
762-
expect_true(collect(stats)[3, "age"] == 5.5)
760+
expect_equal(collect(stats)[1, "summary"], "count")
761+
expect_equal(collect(stats)[2, "age"], "24.5")
762+
expect_equal(collect(stats)[3, "age"], "5.5")
763763
stats <- describe(df)
764-
expect_true(collect(stats)[4, "name"] == "Andy")
765-
expect_true(collect(stats)[5, "age"] == 30.0)
764+
expect_equal(collect(stats)[4, "name"], "Andy")
765+
expect_equal(collect(stats)[5, "age"], "30")
766766
})
767767

768768
unlink(parquetPath)

core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18-
#dag-viz-graph svg path {
19-
stroke: #444;
20-
stroke-width: 1.5px;
21-
}
22-
23-
#dag-viz-graph svg g.cluster rect {
24-
stroke-width: 1px;
25-
}
26-
27-
#dag-viz-graph svg g.node circle {
28-
fill: #444;
18+
#dag-viz-graph a, #dag-viz-graph a:hover {
19+
text-decoration: none;
2920
}
3021

31-
#dag-viz-graph svg g.node rect {
32-
fill: #C3EBFF;
33-
stroke: #3EC0FF;
34-
stroke-width: 1px;
22+
#dag-viz-graph .label {
23+
font-weight: normal;
24+
text-shadow: none;
3525
}
3626

37-
#dag-viz-graph svg g.node.cached circle {
38-
fill: #444;
27+
#dag-viz-graph svg path {
28+
stroke: #444;
29+
stroke-width: 1.5px;
3930
}
4031

41-
#dag-viz-graph svg g.node.cached rect {
42-
fill: #B3F5C5;
43-
stroke: #56F578;
32+
#dag-viz-graph svg g.cluster rect {
4433
stroke-width: 1px;
4534
}
4635

@@ -61,12 +50,23 @@
6150
stroke-width: 1px;
6251
}
6352

64-
#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
53+
#dag-viz-graph svg.job g.cluster.skipped rect {
54+
fill: #D6D6D6;
55+
stroke: #B7B7B7;
56+
stroke-width: 1px;
57+
}
58+
59+
#dag-viz-graph svg.job g.cluster.stage rect {
6560
fill: #FFFFFF;
6661
stroke: #FF99AC;
6762
stroke-width: 1px;
6863
}
6964

65+
#dag-viz-graph svg.job g.cluster.stage.skipped rect {
66+
stroke: #ADADAD;
67+
stroke-width: 1px;
68+
}
69+
7070
#dag-viz-graph svg.job g#cross-stage-edges path {
7171
fill: none;
7272
}
@@ -75,6 +75,20 @@
7575
fill: #333;
7676
}
7777

78+
#dag-viz-graph svg.job g.cluster.skipped text {
79+
fill: #666;
80+
}
81+
82+
#dag-viz-graph svg.job g.node circle {
83+
fill: #444;
84+
}
85+
86+
#dag-viz-graph svg.job g.node.cached circle {
87+
fill: #A3F545;
88+
stroke: #52C366;
89+
stroke-width: 2px;
90+
}
91+
7892
/* Stage page specific styles */
7993

8094
#dag-viz-graph svg.stage g.cluster rect {
@@ -83,7 +97,7 @@
8397
stroke-width: 1px;
8498
}
8599

86-
#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
100+
#dag-viz-graph svg.stage g.cluster.stage rect {
87101
fill: #FFFFFF;
88102
stroke: #FFA6B6;
89103
stroke-width: 1px;
@@ -97,11 +111,14 @@
97111
fill: #333;
98112
}
99113

100-
#dag-viz-graph a, #dag-viz-graph a:hover {
101-
text-decoration: none;
114+
#dag-viz-graph svg.stage g.node rect {
115+
fill: #C3EBFF;
116+
stroke: #3EC0FF;
117+
stroke-width: 1px;
102118
}
103119

104-
#dag-viz-graph .label {
105-
font-weight: normal;
106-
text-shadow: none;
120+
#dag-viz-graph svg.stage g.node.cached rect {
121+
fill: #B3F5C5;
122+
stroke: #52C366;
123+
stroke-width: 2px;
107124
}

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ var VizConstants = {
5757
stageSep: 40,
5858
graphPrefix: "graph_",
5959
nodePrefix: "node_",
60-
stagePrefix: "stage_",
61-
clusterPrefix: "cluster_",
62-
stageClusterPrefix: "cluster_stage_"
60+
clusterPrefix: "cluster_"
6361
};
6462

6563
var JobPageVizConstants = {
@@ -133,9 +131,7 @@ function renderDagViz(forJob) {
133131
}
134132

135133
// Render
136-
var svg = graphContainer()
137-
.append("svg")
138-
.attr("class", jobOrStage);
134+
var svg = graphContainer().append("svg").attr("class", jobOrStage);
139135
if (forJob) {
140136
renderDagVizForJob(svg);
141137
} else {
@@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
185181
var dot = metadata.select(".dot-file").text();
186182
var stageId = metadata.attr("stage-id");
187183
var containerId = VizConstants.graphPrefix + stageId;
188-
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
189-
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
190-
.find("a")
191-
.attr("href") + "&expandDagViz=true";
192-
var container = svgContainer
193-
.append("a")
194-
.attr("xlink:href", stageLink)
195-
.append("g")
196-
.attr("id", containerId);
184+
var isSkipped = metadata.attr("skipped") == "true";
185+
var container;
186+
if (isSkipped) {
187+
container = svgContainer
188+
.append("g")
189+
.attr("id", containerId)
190+
.attr("skipped", "true");
191+
} else {
192+
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
193+
// Use the link from the stage table so it also works for the history server
194+
var attemptId = 0
195+
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
196+
.select("a")
197+
.attr("href") + "&expandDagViz=true";
198+
container = svgContainer
199+
.append("a")
200+
.attr("xlink:href", stageLink)
201+
.append("g")
202+
.attr("id", containerId);
203+
}
197204

198205
// Now we need to shift the container for this stage so it doesn't overlap with
199206
// existing ones, taking into account the position and width of the last stage's
200207
// container. We do not need to do this for the first stage of this job.
201208
if (i > 0) {
202-
var existingStages = svgContainer
203-
.selectAll("g.cluster")
204-
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
209+
var existingStages = svgContainer.selectAll("g.cluster.stage")
205210
if (!existingStages.empty()) {
206211
var lastStage = d3.select(existingStages[0].pop());
207212
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
214219
// Actually render the stage
215220
renderDot(dot, container, true);
216221

222+
// Mark elements as skipped if appropriate. Unfortunately we need to mark all
223+
// elements instead of the parent container because of CSS override rules.
224+
if (isSkipped) {
225+
container.selectAll("g").classed("skipped", true);
226+
}
227+
217228
// Round corners on rectangles
218229
container
219230
.selectAll("rect")
@@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
243254
var renderer = new dagreD3.render();
244255
preprocessGraphLayout(g, forJob);
245256
renderer(container, g);
257+
258+
// Find the stage cluster and mark it for styling and post-processing
259+
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
246260
}
247261

248262
/* -------------------- *

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
678678
*
679679
* Note: Return statements are NOT allowed in the given body.
680680
*/
681-
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
681+
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
682682

683683
// Methods for creating RDDs
684684

@@ -697,6 +697,78 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
697697
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
698698
}
699699

700+
/**
701+
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
702+
* `step` every element.
703+
*
704+
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
705+
*
706+
* @param start the start value.
707+
* @param end the end value.
708+
* @param step the incremental step
709+
* @param numSlices the partition number of the new RDD.
710+
* @return
711+
*/
712+
def range(
713+
start: Long,
714+
end: Long,
715+
step: Long = 1,
716+
numSlices: Int = defaultParallelism): RDD[Long] = withScope {
717+
assertNotStopped()
718+
// when step is 0, range will run infinitely
719+
require(step != 0, "step cannot be 0")
720+
val numElements: BigInt = {
721+
val safeStart = BigInt(start)
722+
val safeEnd = BigInt(end)
723+
if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
724+
(safeEnd - safeStart) / step
725+
} else {
726+
// the remainder has the same sign with range, could add 1 more
727+
(safeEnd - safeStart) / step + 1
728+
}
729+
}
730+
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
731+
val partitionStart = (i * numElements) / numSlices * step + start
732+
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
733+
def getSafeMargin(bi: BigInt): Long =
734+
if (bi.isValidLong) {
735+
bi.toLong
736+
} else if (bi > 0) {
737+
Long.MaxValue
738+
} else {
739+
Long.MinValue
740+
}
741+
val safePartitionStart = getSafeMargin(partitionStart)
742+
val safePartitionEnd = getSafeMargin(partitionEnd)
743+
744+
new Iterator[Long] {
745+
private[this] var number: Long = safePartitionStart
746+
private[this] var overflow: Boolean = false
747+
748+
override def hasNext =
749+
if (!overflow) {
750+
if (step > 0) {
751+
number < safePartitionEnd
752+
} else {
753+
number > safePartitionEnd
754+
}
755+
} else false
756+
757+
override def next() = {
758+
val ret = number
759+
number += step
760+
if (number < ret ^ step < 0) {
761+
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
762+
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
763+
// back, we are pretty sure that we have an overflow.
764+
overflow = true
765+
}
766+
ret
767+
}
768+
}
769+
})
770+
}
771+
700772
/** Distribute a local Scala collection to form an RDD.
701773
*
702774
* This method is identical to `parallelize`.
@@ -1087,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10871159
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
10881160
withScope {
10891161
assertNotStopped()
1090-
val kc = kcf()
1091-
val vc = vcf()
1162+
val kc = clean(kcf)()
1163+
val vc = clean(vcf)()
10921164
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
10931165
val writables = hadoopFile(path, format,
10941166
kc.writableClass(km).asInstanceOf[Class[Writable]],

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] class PythonRDD(
4747
pythonIncludes: JList[String],
4848
preservePartitoning: Boolean,
4949
pythonExec: String,
50+
pythonVer: String,
5051
broadcastVars: JList[Broadcast[PythonBroadcast]],
5152
accumulator: Accumulator[JList[Array[Byte]]])
5253
extends RDD[Array[Byte]](parent) {
@@ -210,6 +211,8 @@ private[spark] class PythonRDD(
210211
val dataOut = new DataOutputStream(stream)
211212
// Partition index
212213
dataOut.writeInt(split.index)
214+
// Python version of driver
215+
PythonRDD.writeUTF(pythonVer, dataOut)
213216
// sparkFilesDir
214217
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
215218
// Python includes (*.zip and *.egg files)

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,15 @@ private[spark] object PythonUtils {
5050
/**
5151
* Convert list of T into seq of T (for calling API with varargs)
5252
*/
53-
def toSeq[T](cols: JList[T]): Seq[T] = {
54-
cols.toList.toSeq
53+
def toSeq[T](vs: JList[T]): Seq[T] = {
54+
vs.toList.toSeq
55+
}
56+
57+
/**
58+
* Convert list of T into array of T (for calling API with array)
59+
*/
60+
def toArray[T](vs: JList[T]): Array[T] = {
61+
vs.toArray().asInstanceOf[Array[T]]
5562
}
5663

5764
/**

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2525

2626
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2727
import org.apache.spark.deploy.SparkHadoopUtil
28-
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
28+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
29+
UIRoot}
2930
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
3031
import org.apache.spark.ui.JettyUtils._
3132
import org.apache.spark.util.{SignalLogger, Utils}
@@ -125,7 +126,7 @@ class HistoryServer(
125126
def initialize() {
126127
attachPage(new HistoryPage(this))
127128

128-
attachHandler(JsonRootResource.getJsonServlet(this))
129+
attachHandler(ApiRootResource.getServletHandler(this))
129130

130131
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
131132

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.deploy.master.ui
1919

2020
import org.apache.spark.Logging
2121
import org.apache.spark.deploy.master.Master
22-
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
22+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationsListResource, ApplicationInfo,
23+
UIRoot}
2324
import org.apache.spark.ui.{SparkUI, WebUI}
2425
import org.apache.spark.ui.JettyUtils._
2526
import org.apache.spark.util.RpcUtils
@@ -47,7 +48,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
4748
attachPage(new HistoryNotFoundPage(this))
4849
attachPage(masterPage)
4950
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
50-
attachHandler(JsonRootResource.getJsonServlet(this))
51+
attachHandler(ApiRootResource.getServletHandler(this))
5152
attachHandler(createRedirectHandler(
5253
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
5354
attachHandler(createRedirectHandler(

0 commit comments

Comments
 (0)