Skip to content

Commit 79ed52a

Browse files
committed
[SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
Adding some missing functions Adding a few more functions Simply wrap overriden methods in base RDD class Adding functions to Java API Removing Scaladoc on overriding methods removing implicit conversions Renaming wrapSchemaRDD to fromSchemaRDD A better toString for JavaSchemaRDD Use this.type as return type Adding Python API changes for SchemaRDD Replacing fromSchemaRDD() with toJavaSchemaRDD() avoid using this.type in Jave API removing redundant methods in EdgeRDD and VertexRDD
1 parent 0a5a468 commit 79ed52a

File tree

6 files changed

+239
-22
lines changed

6 files changed

+239
-22
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
128128
@transient var name: String = null
129129

130130
/** Assign a name to this RDD */
131-
def setName(_name: String): RDD[T] = {
131+
def setName(_name: String): this.type = {
132132
name = _name
133133
this
134134
}
@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
138138
* it is computed. This can only be used to assign a new storage level if the RDD does not
139139
* have a storage level set yet..
140140
*/
141-
def persist(newLevel: StorageLevel): RDD[T] = {
141+
def persist(newLevel: StorageLevel): this.type = {
142142
// TODO: Handle changes of StorageLevel
143143
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
144144
throw new UnsupportedOperationException(
@@ -152,18 +152,18 @@ abstract class RDD[T: ClassTag](
152152
}
153153

154154
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
155-
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
155+
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
156156

157157
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
158-
def cache(): RDD[T] = persist()
158+
def cache(): this.type = persist()
159159

160160
/**
161161
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
162162
*
163163
* @param blocking Whether to block until all blocks are deleted.
164164
* @return This RDD.
165165
*/
166-
def unpersist(blocking: Boolean = true): RDD[T] = {
166+
def unpersist(blocking: Boolean = true): this.type = {
167167
logInfo("Removing RDD " + id + " from persistence list")
168168
sc.unpersistRDD(id, blocking)
169169
storageLevel = StorageLevel.NONE

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](
5151

5252
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
5353

54-
override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
54+
override def persist(newLevel: StorageLevel): this.type = {
5555
partitionsRDD.persist(newLevel)
5656
this
5757
}
5858

59-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
60-
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
61-
62-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
63-
override def cache(): EdgeRDD[ED] = persist()
64-
65-
override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
59+
override def unpersist(blocking: Boolean = true): this.type = {
6660
partitionsRDD.unpersist(blocking)
6761
this
6862
}

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
7171
override protected def getPreferredLocations(s: Partition): Seq[String] =
7272
partitionsRDD.preferredLocations(s)
7373

74-
override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
74+
override def persist(newLevel: StorageLevel): this.type = {
7575
partitionsRDD.persist(newLevel)
7676
this
7777
}
7878

79-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
80-
override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
81-
82-
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
83-
override def cache(): VertexRDD[VD] = persist()
84-
85-
override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
79+
override def unpersist(blocking: Boolean = true): this.type = {
8680
partitionsRDD.unpersist(blocking)
8781
this
8882
}

python/pyspark/sql.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,35 @@ def getCheckpointFile(self):
360360
else:
361361
return None
362362

363+
def coalesce(self, numPartitions, shuffle=False):
364+
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
365+
return SchemaRDD(rdd, self.sql_ctx)
366+
367+
def distinct(self):
368+
rdd = self._jschema_rdd.distinct()
369+
return SchemaRDD(rdd, self.sql_ctx)
370+
371+
def intersection(self, other):
372+
if (other.__class__ is SchemaRDD):
373+
rdd = self._jschema_rdd.intersection(other._jschema_rdd)
374+
return SchemaRDD(rdd, self.sql_ctx)
375+
else:
376+
raise ValueError("Can only intersect with another SchemaRDD")
377+
378+
def repartition(self, numPartitions):
379+
rdd = self._jschema_rdd.repartition(numPartitions)
380+
return SchemaRDD(rdd, self.sql_ctx)
381+
382+
def subtract(self, other, numPartitions=None):
383+
if (other.__class__ is SchemaRDD):
384+
if numPartitions is None:
385+
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
386+
else:
387+
rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
388+
return SchemaRDD(rdd, self.sql_ctx)
389+
else:
390+
raise ValueError("Can only subtract another SchemaRDD")
391+
363392
def _test():
364393
import doctest
365394
from pyspark.context import SparkContext

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package org.apache.spark.sql
1919

2020
import net.razorvine.pickle.Pickler
2121

22-
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
22+
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
2323
import org.apache.spark.annotation.{AlphaComponent, Experimental}
2424
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.api.java.JavaSchemaRDD
2526
import org.apache.spark.sql.catalyst.analysis._
2627
import org.apache.spark.sql.catalyst.expressions._
2728
import org.apache.spark.sql.catalyst.plans.logical._
2829
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
2930
import org.apache.spark.sql.catalyst.types.BooleanType
31+
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
3032
import org.apache.spark.api.java.JavaRDD
3133
import java.util.{Map => JMap}
3234

@@ -296,6 +298,13 @@ class SchemaRDD(
296298
*/
297299
def toSchemaRDD = this
298300

301+
/**
302+
* Returns this RDD as a JavaSchemaRDD.
303+
*
304+
* @group schema
305+
*/
306+
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
307+
299308
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
300309
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
301310
this.mapPartitions { iter =>
@@ -314,4 +323,55 @@ class SchemaRDD(
314323
}
315324
}
316325
}
326+
327+
/**
328+
* Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
329+
* of base RDD functions that do not change schema.
330+
*
331+
* @param rdd RDD derived from this one and has same schema
332+
*
333+
* @group schema
334+
*/
335+
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
336+
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
337+
}
338+
339+
// =======================================================================
340+
// Base RDD functions that do NOT change schema
341+
// =======================================================================
342+
343+
// Transformations (return a new RDD)
344+
345+
override def coalesce(numPartitions: Int, shuffle: Boolean = false): SchemaRDD =
346+
applySchema(super.coalesce(numPartitions, shuffle))
347+
348+
override def distinct(): SchemaRDD =
349+
applySchema(super.distinct())
350+
351+
override def distinct(numPartitions: Int): SchemaRDD =
352+
applySchema(super.distinct(numPartitions))
353+
354+
override def filter(f: Row => Boolean): SchemaRDD =
355+
applySchema(super.filter(f))
356+
357+
override def intersection(other: RDD[Row]): SchemaRDD =
358+
applySchema(super.intersection(other))
359+
360+
override def intersection(other: RDD[Row], partitioner: Partitioner): SchemaRDD =
361+
applySchema(super.intersection(other, partitioner))
362+
363+
override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
364+
applySchema(super.intersection(other, numPartitions))
365+
366+
override def repartition(numPartitions: Int): SchemaRDD =
367+
applySchema(super.repartition(numPartitions))
368+
369+
override def subtract(other: RDD[Row]): SchemaRDD =
370+
applySchema(super.subtract(other))
371+
372+
override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
373+
applySchema(super.subtract(other, numPartitions))
374+
375+
override def subtract(other: RDD[Row], p: Partitioner): SchemaRDD =
376+
applySchema(super.subtract(other, p))
317377
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.api.java
1919

20+
import org.apache.spark.Partitioner
2021
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
22+
import org.apache.spark.api.java.function.{Function => JFunction}
2123
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
2224
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2325
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.storage.StorageLevel
2427

2528
/**
2629
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
@@ -45,4 +48,141 @@ class JavaSchemaRDD(
4548
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
4649

4750
val rdd = baseSchemaRDD.map(new Row(_))
51+
52+
override def toString: String = baseSchemaRDD.toString
53+
54+
// =======================================================================
55+
// Base RDD functions that do NOT change schema
56+
// =======================================================================
57+
58+
// Common RDD functions
59+
60+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
61+
def cache(): JavaSchemaRDD = {
62+
baseSchemaRDD.cache()
63+
this
64+
}
65+
66+
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
67+
def persist(): JavaSchemaRDD = {
68+
baseSchemaRDD.persist()
69+
this
70+
}
71+
72+
/**
73+
* Set this RDD's storage level to persist its values across operations after the first time
74+
* it is computed. This can only be used to assign a new storage level if the RDD does not
75+
* have a storage level set yet..
76+
*/
77+
def persist(newLevel: StorageLevel): JavaSchemaRDD = {
78+
baseSchemaRDD.persist(newLevel)
79+
this
80+
}
81+
82+
/**
83+
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
84+
*
85+
* @param blocking Whether to block until all blocks are deleted.
86+
* @return This RDD.
87+
*/
88+
def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
89+
baseSchemaRDD.unpersist(blocking)
90+
this
91+
}
92+
93+
/** Assign a name to this RDD */
94+
def setName(name: String): JavaSchemaRDD = {
95+
baseSchemaRDD.setName(name)
96+
this
97+
}
98+
99+
// Transformations (return a new RDD)
100+
101+
/**
102+
* Return a new RDD that is reduced into `numPartitions` partitions.
103+
*/
104+
def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
105+
baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
106+
107+
/**
108+
* Return a new RDD containing the distinct elements in this RDD.
109+
*/
110+
def distinct(): JavaSchemaRDD =
111+
baseSchemaRDD.distinct().toJavaSchemaRDD
112+
113+
/**
114+
* Return a new RDD containing the distinct elements in this RDD.
115+
*/
116+
def distinct(numPartitions: Int): JavaSchemaRDD =
117+
baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
118+
119+
/**
120+
* Return a new RDD containing only the elements that satisfy a predicate.
121+
*/
122+
def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
123+
baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
124+
125+
/**
126+
* Return the intersection of this RDD and another one. The output will not contain any
127+
* duplicate elements, even if the input RDDs did.
128+
*
129+
* Note that this method performs a shuffle internally.
130+
*/
131+
def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
132+
this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
133+
134+
/**
135+
* Return the intersection of this RDD and another one. The output will not contain any
136+
* duplicate elements, even if the input RDDs did.
137+
*
138+
* Note that this method performs a shuffle internally.
139+
*
140+
* @param partitioner Partitioner to use for the resulting RDD
141+
*/
142+
def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
143+
this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
144+
145+
/**
146+
* Return the intersection of this RDD and another one. The output will not contain any
147+
* duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
148+
*
149+
* Note that this method performs a shuffle internally.
150+
*
151+
* @param numPartitions How many partitions to use in the resulting RDD
152+
*/
153+
def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
154+
this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
155+
156+
/**
157+
* Return a new RDD that has exactly `numPartitions` partitions.
158+
*
159+
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
160+
* a shuffle to redistribute data.
161+
*
162+
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
163+
* which can avoid performing a shuffle.
164+
*/
165+
def repartition(numPartitions: Int): JavaSchemaRDD =
166+
baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
167+
168+
/**
169+
* Return an RDD with the elements from `this` that are not in `other`.
170+
*
171+
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
172+
* RDD will be <= us.
173+
*/
174+
def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
175+
this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
176+
177+
/**
178+
* Return an RDD with the elements from `this` that are not in `other`.
179+
*/
180+
def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
181+
this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
182+
183+
/**
184+
* Return an RDD with the elements from `this` that are not in `other`.
185+
*/
186+
def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
187+
this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
48188
}

0 commit comments

Comments
 (0)