Skip to content

[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations... #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null

/** Assign a name to this RDD */
def setName(_name: String): RDD[T] = {
def setName(_name: String): this.type = {
name = _name
this
}
Expand All @@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): RDD[T] = {
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fairly ignorent of scala; I am not sure I follow, where is type coming from ? And what is it exactly ?
Also , does this change mean it is an incompat interface change ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to allow child classes to not have to override functions like persist and cache that are used for chaining:

http://scalada.blogspot.com/2008/02/thistype-for-chaining-method-calls.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, thanks !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this cant be applied to checkpointRDD and randomSplit ?
What about things like filter, distinct, repartition, sample, filterWith etc ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm if you look at this patch, it explicitly overrides those for SchemaRDD. You can't use this.type there because the return type is actually a new RDD class (FilteredRDD and so on).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm agree with Patrick, you have to return this for this.type return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, in retrospect that looks obvious !
On 07-May-2014 2:52 am, "Patrick Wendell" [email protected] wrote:

In core/src/main/scala/org/apache/spark/rdd/RDD.scala:

@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/

  • def persist(newLevel: StorageLevel): RDD[T] = {
  • def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel

@mridulm https://github.com/mridulm if you look at this patch, it
explicitly overrides those for SchemaRDD. You can't use this.type there
because the return type is actually a new RDD class (FilteredRDD and so
on).


Reply to this email directly or view it on GitHubhttps://github.com//pull/448/files#r12349982
.

if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
Expand All @@ -152,18 +152,18 @@ abstract class RDD[T: ClassTag](
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
def cache(): this.type = persist()

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
def unpersist(blocking: Boolean = true): RDD[T] = {
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
Expand Down
10 changes: 2 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()

override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}
Expand Down
10 changes: 2 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)

override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
override def persist(newLevel: StorageLevel): this.type = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin can we just remove these functions if RDD returns this.type instead? It looks like you were only overloading them for the covariant return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus it actually has its own logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I guess I was actually looking at the functions cache and persist below. Those could be dropped though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right.

partitionsRDD.persist(newLevel)
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexRDD[VD] = persist()

override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}
Expand Down
2 changes: 2 additions & 0 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ object MimaBuild {
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("graphx.EdgeRDD") ++
excludeSparkClass("graphx.VertexRDD") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,35 @@ def getCheckpointFile(self):
else:
return None

def coalesce(self, numPartitions, shuffle=False):
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
return SchemaRDD(rdd, self.sql_ctx)

def distinct(self):
rdd = self._jschema_rdd.distinct()
return SchemaRDD(rdd, self.sql_ctx)

def intersection(self, other):
if (other.__class__ is SchemaRDD):
rdd = self._jschema_rdd.intersection(other._jschema_rdd)
return SchemaRDD(rdd, self.sql_ctx)
else:
raise ValueError("Can only intersect with another SchemaRDD")

def repartition(self, numPartitions):
rdd = self._jschema_rdd.repartition(numPartitions)
return SchemaRDD(rdd, self.sql_ctx)

def subtract(self, other, numPartitions=None):
if (other.__class__ is SchemaRDD):
if numPartitions is None:
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
else:
rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
return SchemaRDD(rdd, self.sql_ctx)
else:
raise ValueError("Can only subtract another SchemaRDD")

def _test():
import doctest
from pyspark.context import SparkContext
Expand Down
67 changes: 66 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.spark.sql

import net.razorvine.pickle.Pickler

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}

Expand Down Expand Up @@ -296,6 +298,13 @@ class SchemaRDD(
*/
def toSchemaRDD = this

/**
* Returns this RDD as a JavaSchemaRDD.
*
* @group schema
*/
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)

private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter =>
Expand All @@ -314,4 +323,60 @@ class SchemaRDD(
}
}
}

/**
* Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
* of base RDD functions that do not change schema.
*
* @param rdd RDD derived from this one and has same schema
*
* @group schema
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
}

// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================

// Transformations (return a new RDD)

override def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.coalesce(numPartitions, shuffle)(ord))

override def distinct(): SchemaRDD =
applySchema(super.distinct())

override def distinct(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.distinct(numPartitions)(ord))

override def filter(f: Row => Boolean): SchemaRDD =
applySchema(super.filter(f))

override def intersection(other: RDD[Row]): SchemaRDD =
applySchema(super.intersection(other))

override def intersection(other: RDD[Row], partitioner: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.intersection(other, partitioner)(ord))

override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.intersection(other, numPartitions))

override def repartition(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.repartition(numPartitions)(ord))

override def subtract(other: RDD[Row]): SchemaRDD =
applySchema(super.subtract(other))

override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.subtract(other, numPartitions))

override def subtract(other: RDD[Row], p: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.subtract(other, p)(ord))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.api.java

import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
Expand All @@ -45,4 +48,141 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)

val rdd = baseSchemaRDD.map(new Row(_))

override def toString: String = baseSchemaRDD.toString

// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================

// Common RDD functions

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaSchemaRDD = {
baseSchemaRDD.cache()
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): JavaSchemaRDD = {
baseSchemaRDD.persist()
this
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): JavaSchemaRDD = {
baseSchemaRDD.persist(newLevel)
this
}

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
baseSchemaRDD.unpersist(blocking)
this
}

/** Assign a name to this RDD */
def setName(name: String): JavaSchemaRDD = {
baseSchemaRDD.setName(name)
this
}

// Transformations (return a new RDD)

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): JavaSchemaRDD =
baseSchemaRDD.distinct().toJavaSchemaRDD

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int): JavaSchemaRDD =
baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD

/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD

/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*
* @param partitioner Partitioner to use for the resulting RDD
*/
def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD

/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
*
* Note that this method performs a shuffle internally.
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD

/**
* Return a new RDD that has exactly `numPartitions` partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int): JavaSchemaRDD =
baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD

/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
}