Skip to content

Commit da9f9e0

Browse files
kanzhangpwendell
authored andcommitted
[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...
... that do not change schema Author: Kan Zhang <[email protected]> Closes #448 from kanzhang/SPARK-1460 and squashes the following commits: 111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD 91dc787 [Kan Zhang] Taking into account newly added Ordering param 79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema (cherry picked from commit 967635a) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 756c969 commit da9f9e0

File tree

7 files changed

+246
-22
lines changed

7 files changed

+246
-22
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
128128
@transient var name: String = null
129129

130130
/** Assign a name to this RDD */
131-
def setName(_name: String): RDD[T] = {
131+
def setName(_name: String): this.type = {
132132
name = _name
133133
this
134134
}
@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
138138
* it is computed. This can only be used to assign a new storage level if the RDD does not
139139
* have a storage level set yet..
140140
*/
141-
def persist(newLevel: StorageLevel): RDD[T] = {
141+
def persist(newLevel: StorageLevel): this.type = {
142142
// TODO: Handle changes of StorageLevel
143143
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
144144
throw new UnsupportedOperationException(
@@ -152,18 +152,18 @@ abstract class RDD[T: ClassTag](
152152
}
153153

154154
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
155-
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
155+
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
156156

157157
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
158-
def cache(): RDD[T] = persist()
158+
def cache(): this.type = persist()
159159

160160
/**
161161
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
162162
*
163163
* @param blocking Whether to block until all blocks are deleted.
164164
* @return This RDD.
165165
*/
166-
def unpersist(blocking: Boolean = true): RDD[T] = {
166+
def unpersist(blocking: Boolean = true): this.type = {
167167
logInfo("Removing RDD " + id + " from persistence list")
168168
sc.unpersistRDD(id, blocking)
169169
storageLevel = StorageLevel.NONE

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](
5151

5252
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
5353

54-
override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
54+
override def persist(newLevel: StorageLevel): this.type = {
5555
partitionsRDD.persist(newLevel)
5656
this
5757
}
5858

59-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
60-
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
61-
62-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
63-
override def cache(): EdgeRDD[ED] = persist()
64-
65-
override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
59+
override def unpersist(blocking: Boolean = true): this.type = {
6660
partitionsRDD.unpersist(blocking)
6761
this
6862
}

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
7171
override protected def getPreferredLocations(s: Partition): Seq[String] =
7272
partitionsRDD.preferredLocations(s)
7373

74-
override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
74+
override def persist(newLevel: StorageLevel): this.type = {
7575
partitionsRDD.persist(newLevel)
7676
this
7777
}
7878

79-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
80-
override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
81-
82-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
83-
override def cache(): VertexRDD[VD] = persist()
84-
85-
override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
79+
override def unpersist(blocking: Boolean = true): this.type = {
8680
partitionsRDD.unpersist(blocking)
8781
this
8882
}

project/MimaBuild.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ object MimaBuild {
7474
) ++
7575
excludeSparkClass("rdd.ClassTags") ++
7676
excludeSparkClass("util.XORShiftRandom") ++
77+
excludeSparkClass("graphx.EdgeRDD") ++
78+
excludeSparkClass("graphx.VertexRDD") ++
7779
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
7880
excludeSparkClass("mllib.optimization.SquaredGradient") ++
7981
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++

python/pyspark/sql.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,35 @@ def getCheckpointFile(self):
360360
else:
361361
return None
362362

363+
def coalesce(self, numPartitions, shuffle=False):
364+
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
365+
return SchemaRDD(rdd, self.sql_ctx)
366+
367+
def distinct(self):
368+
rdd = self._jschema_rdd.distinct()
369+
return SchemaRDD(rdd, self.sql_ctx)
370+
371+
def intersection(self, other):
372+
if (other.__class__ is SchemaRDD):
373+
rdd = self._jschema_rdd.intersection(other._jschema_rdd)
374+
return SchemaRDD(rdd, self.sql_ctx)
375+
else:
376+
raise ValueError("Can only intersect with another SchemaRDD")
377+
378+
def repartition(self, numPartitions):
379+
rdd = self._jschema_rdd.repartition(numPartitions)
380+
return SchemaRDD(rdd, self.sql_ctx)
381+
382+
def subtract(self, other, numPartitions=None):
383+
if (other.__class__ is SchemaRDD):
384+
if numPartitions is None:
385+
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
386+
else:
387+
rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
388+
return SchemaRDD(rdd, self.sql_ctx)
389+
else:
390+
raise ValueError("Can only subtract another SchemaRDD")
391+
363392
def _test():
364393
import doctest
365394
from pyspark.context import SparkContext

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package org.apache.spark.sql
1919

2020
import net.razorvine.pickle.Pickler
2121

22-
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
22+
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
2323
import org.apache.spark.annotation.{AlphaComponent, Experimental}
2424
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.api.java.JavaSchemaRDD
2526
import org.apache.spark.sql.catalyst.analysis._
2627
import org.apache.spark.sql.catalyst.expressions._
2728
import org.apache.spark.sql.catalyst.plans.logical._
2829
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
2930
import org.apache.spark.sql.catalyst.types.BooleanType
31+
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
3032
import org.apache.spark.api.java.JavaRDD
3133
import java.util.{Map => JMap}
3234

@@ -296,6 +298,13 @@ class SchemaRDD(
296298
*/
297299
def toSchemaRDD = this
298300

301+
/**
302+
* Returns this RDD as a JavaSchemaRDD.
303+
*
304+
* @group schema
305+
*/
306+
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
307+
299308
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
300309
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
301310
this.mapPartitions { iter =>
@@ -314,4 +323,60 @@ class SchemaRDD(
314323
}
315324
}
316325
}
326+
327+
/**
328+
* Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
329+
* of base RDD functions that do not change schema.
330+
*
331+
* @param rdd RDD derived from this one and has same schema
332+
*
333+
* @group schema
334+
*/
335+
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
336+
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
337+
}
338+
339+
// =======================================================================
340+
// Base RDD functions that do NOT change schema
341+
// =======================================================================
342+
343+
// Transformations (return a new RDD)
344+
345+
override def coalesce(numPartitions: Int, shuffle: Boolean = false)
346+
(implicit ord: Ordering[Row] = null): SchemaRDD =
347+
applySchema(super.coalesce(numPartitions, shuffle)(ord))
348+
349+
override def distinct(): SchemaRDD =
350+
applySchema(super.distinct())
351+
352+
override def distinct(numPartitions: Int)
353+
(implicit ord: Ordering[Row] = null): SchemaRDD =
354+
applySchema(super.distinct(numPartitions)(ord))
355+
356+
override def filter(f: Row => Boolean): SchemaRDD =
357+
applySchema(super.filter(f))
358+
359+
override def intersection(other: RDD[Row]): SchemaRDD =
360+
applySchema(super.intersection(other))
361+
362+
override def intersection(other: RDD[Row], partitioner: Partitioner)
363+
(implicit ord: Ordering[Row] = null): SchemaRDD =
364+
applySchema(super.intersection(other, partitioner)(ord))
365+
366+
override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
367+
applySchema(super.intersection(other, numPartitions))
368+
369+
override def repartition(numPartitions: Int)
370+
(implicit ord: Ordering[Row] = null): SchemaRDD =
371+
applySchema(super.repartition(numPartitions)(ord))
372+
373+
override def subtract(other: RDD[Row]): SchemaRDD =
374+
applySchema(super.subtract(other))
375+
376+
override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
377+
applySchema(super.subtract(other, numPartitions))
378+
379+
override def subtract(other: RDD[Row], p: Partitioner)
380+
(implicit ord: Ordering[Row] = null): SchemaRDD =
381+
applySchema(super.subtract(other, p)(ord))
317382
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.api.java
1919

20+
import org.apache.spark.Partitioner
2021
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
22+
import org.apache.spark.api.java.function.{Function => JFunction}
2123
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
2224
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2325
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.storage.StorageLevel
2427

2528
/**
2629
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
@@ -45,4 +48,141 @@ class JavaSchemaRDD(
4548
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
4649

4750
val rdd = baseSchemaRDD.map(new Row(_))
51+
52+
override def toString: String = baseSchemaRDD.toString
53+
54+
// =======================================================================
55+
// Base RDD functions that do NOT change schema
56+
// =======================================================================
57+
58+
// Common RDD functions
59+
60+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
61+
def cache(): JavaSchemaRDD = {
62+
baseSchemaRDD.cache()
63+
this
64+
}
65+
66+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
67+
def persist(): JavaSchemaRDD = {
68+
baseSchemaRDD.persist()
69+
this
70+
}
71+
72+
/**
73+
* Set this RDD's storage level to persist its values across operations after the first time
74+
* it is computed. This can only be used to assign a new storage level if the RDD does not
75+
* have a storage level set yet..
76+
*/
77+
def persist(newLevel: StorageLevel): JavaSchemaRDD = {
78+
baseSchemaRDD.persist(newLevel)
79+
this
80+
}
81+
82+
/**
83+
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
84+
*
85+
* @param blocking Whether to block until all blocks are deleted.
86+
* @return This RDD.
87+
*/
88+
def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
89+
baseSchemaRDD.unpersist(blocking)
90+
this
91+
}
92+
93+
/** Assign a name to this RDD */
94+
def setName(name: String): JavaSchemaRDD = {
95+
baseSchemaRDD.setName(name)
96+
this
97+
}
98+
99+
// Transformations (return a new RDD)
100+
101+
/**
102+
* Return a new RDD that is reduced into `numPartitions` partitions.
103+
*/
104+
def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
105+
baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
106+
107+
/**
108+
* Return a new RDD containing the distinct elements in this RDD.
109+
*/
110+
def distinct(): JavaSchemaRDD =
111+
baseSchemaRDD.distinct().toJavaSchemaRDD
112+
113+
/**
114+
* Return a new RDD containing the distinct elements in this RDD.
115+
*/
116+
def distinct(numPartitions: Int): JavaSchemaRDD =
117+
baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
118+
119+
/**
120+
* Return a new RDD containing only the elements that satisfy a predicate.
121+
*/
122+
def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
123+
baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
124+
125+
/**
126+
* Return the intersection of this RDD and another one. The output will not contain any
127+
* duplicate elements, even if the input RDDs did.
128+
*
129+
* Note that this method performs a shuffle internally.
130+
*/
131+
def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
132+
this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
133+
134+
/**
135+
* Return the intersection of this RDD and another one. The output will not contain any
136+
* duplicate elements, even if the input RDDs did.
137+
*
138+
* Note that this method performs a shuffle internally.
139+
*
140+
* @param partitioner Partitioner to use for the resulting RDD
141+
*/
142+
def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
143+
this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
144+
145+
/**
146+
* Return the intersection of this RDD and another one. The output will not contain any
147+
* duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
148+
*
149+
* Note that this method performs a shuffle internally.
150+
*
151+
* @param numPartitions How many partitions to use in the resulting RDD
152+
*/
153+
def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
154+
this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
155+
156+
/**
157+
* Return a new RDD that has exactly `numPartitions` partitions.
158+
*
159+
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
160+
* a shuffle to redistribute data.
161+
*
162+
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
163+
* which can avoid performing a shuffle.
164+
*/
165+
def repartition(numPartitions: Int): JavaSchemaRDD =
166+
baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
167+
168+
/**
169+
* Return an RDD with the elements from `this` that are not in `other`.
170+
*
171+
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
172+
* RDD will be <= us.
173+
*/
174+
def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
175+
this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
176+
177+
/**
178+
* Return an RDD with the elements from `this` that are not in `other`.
179+
*/
180+
def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
181+
this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
182+
183+
/**
184+
* Return an RDD with the elements from `this` that are not in `other`.
185+
*/
186+
def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
187+
this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
48188
}

0 commit comments

Comments
 (0)