Skip to content

[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations... #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

kanzhang
Copy link
Contributor

... that do not change schema

@kanzhang
Copy link
Contributor Author

First try, pls comment. Not very comfortable with methods that take other RDDs, like intersect, subtract and union, since caller has to make sure they of the same schema.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@marmbrus
Copy link
Contributor

ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@marmbrus
Copy link
Contributor

Thanks for doing this!

I think we are actually okay for intersect and subtract as anything in the result must be a row that was in the original RDD and thus must have a correct schema. If you intersect with a different schema you will get back an empty rdd. If you subtract with an different schema the subtraction will be a no-op and you'll get back the original RDD.

Union is a little more troublesome. We could check the schema and throw an error if they don't match, but that is kinda changing the semantics relative the the standard union call on RDD. Also, when we do a SQL union we do type widening, so just calling RDD union and returning a SchemaRDD is a little weird.

So, I'd propose we leave union out, as users that want SQL semantics here can already call unionAll. @mateiz might have thoughts here too.

A few other methods we can add that also don't change the schema:

  • distinct() with no numPartitions
  • repartition
  • setName(...) ?
  • randomSplit (not sure if this is okay since Array is invariant)

override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.intersection(other, numPartitions))

override def sample(withReplacement: Boolean, fraction: Double, seed: Int): SchemaRDD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a sample method that returns SchemaRDD above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I noticed it. It has a different signature and doesn't override the base sample() method. I thought this was done on purpose - you wanted to keep both implementations, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the execution eventually calls the base method, so it is equivalent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are going to end up getting the same thing. I'd say we drop this one and leave the other. Right now it probably doesn't matter, but the other one is lazy and gives the optimizer a chance to possibly improve things before actually executing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the base impl lazy also (till compute() is called)? It's kind of hard to tell the difference for users. What's your thinking in keeping the base implementation around (not overriding it)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you are right. The base impl is probably lazy too. The distinction I was trying to make is that while normal RDD operations are lazy, they are not holistically optimized before execution. Where as if we create a logical operator and defer the creation of RDDs, there may be some extra chances for optimization (at some point in the future). We definitely want to override the base impl, but we don't need to have multiple redundant methods for creating samples.

Also note that you might need to sync with the changes being made in #462 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads-up. So #462 is already doing it, I'll skip it (I meant overriding it with the query one too).

@marmbrus
Copy link
Contributor

Also, we should make the same changes to the Java and Python API if possible.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14248/

@mateiz
Copy link
Contributor

mateiz commented Apr 19, 2014

I agree with leaving union out and adding repartition, coalesce and the other version of distinct. Also these should definitely be added to Java and Python too.

@kanzhang
Copy link
Contributor Author

Thanks for your suggestions. I'll update.

Btw, I don't see PythonSchemaRDD in the code base yet, can I leave out Python for now?

@kanzhang
Copy link
Contributor Author

@marmbrus you are right, I can't override randomSplit() due to invariance of Array.

How about cache(), persist(), unpersist()?

@marmbrus
Copy link
Contributor

How about cache(), persist(), unpersist()?

Good catch!

Btw, I don't see PythonSchemaRDD in the code base yet, can I leave out Python for now?

It is just called SchemaRDD and is located in python/pyspark/sql.py.

override def subtract(other: RDD[Row], p: Partitioner): SchemaRDD =
applySchema(super.subtract(other, p))

override def union(other: RDD[Row]): SchemaRDD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to remove this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kanzhang
Copy link
Contributor Author

Oh, I see. Thx.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@kanzhang
Copy link
Contributor Author

Hey, just pushed an update on Scala and Java API. Wanted to get some feedback before I move on to Python. Pls pay attention to signatures of filter, intersection and subtract on the Java API. Thx.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14414/


// Common RDD functions

override def cache(): SchemaRDD = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have mentioned this before, but we could consider using this.type instead in the Base RDD class for these methods. I'm not sure if that is breaking API or too much scala magic though. @mateiz ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay trying it, my questions then are what it looks like in Scaladoc and what it looks like in Java. We should also double-check that Scala expects binary compatibility for this kind of return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, will verify this.

@marmbrus
Copy link
Contributor

Looking pretty good. Thanks again for working on this!

Jenkins, test this please.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@kanzhang
Copy link
Contributor Author

kanzhang commented May 1, 2014

@marmbrus @mateiz Here's an update on using this.type as return type. See following shell output. The result type for setName() method has changed from org.apache.spark.rdd.RDD[Record] to rdd.type. Similar things happened for SchemaRDD and JavaSchemaRDD (not a subclass). The benefit is we don't have to reimplement those methods in subclasses.

scala> rdd.setName("RDD")
res0: rdd.type = RDD ParallelCollectionRDD[0] at parallelize at <console>:16

scala> rdd
res1: org.apache.spark.rdd.RDD[Record] = RDD ParallelCollectionRDD[0] at parallelize at <console>:16

scala> srdd.setName("SCHEMA RDD")
res2: srdd.type = 
SCHEMA RDD SchemaRDD[2] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExistingRdd [key#0,value#1], MappedRDD[1] at map at basicOperators.scala:147

scala> srdd
res3: org.apache.spark.sql.SchemaRDD = 
SCHEMA RDD SchemaRDD[2] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExistingRdd [key#0,value#1], MappedRDD[1] at map at basicOperators.scala:147

scala> jsrdd.setName("JAVA SCHEMA RDD")
res4: jsrdd.type = 
JAVA SCHEMA RDD SchemaRDD[3] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExistingRdd [key#0,value#1], MappedRDD[1] at map at basicOperators.scala:147

scala> jsrdd
res5: org.apache.spark.sql.api.java.JavaSchemaRDD = 
JAVA SCHEMA RDD SchemaRDD[3] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExistingRdd [key#0,value#1], MappedRDD[1] at map at basicOperators.scala:147

@kanzhang
Copy link
Contributor Author

kanzhang commented May 1, 2014

PS. Subclasses that override those methods may have to be updated and recompiled (like what I did in EdgeRDD, VertexRDD). Better ideas?

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14611/

@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): RDD[T] = {
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fairly ignorent of scala; I am not sure I follow, where is type coming from ? And what is it exactly ?
Also , does this change mean it is an incompat interface change ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to allow child classes to not have to override functions like persist and cache that are used for chaining:

http://scalada.blogspot.com/2008/02/thistype-for-chaining-method-calls.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, thanks !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this cant be applied to checkpointRDD and randomSplit ?
What about things like filter, distinct, repartition, sample, filterWith etc ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm if you look at this patch, it explicitly overrides those for SchemaRDD. You can't use this.type there because the return type is actually a new RDD class (FilteredRDD and so on).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm agree with Patrick, you have to return this for this.type return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, in retrospect that looks obvious !
On 07-May-2014 2:52 am, "Patrick Wendell" [email protected] wrote:

In core/src/main/scala/org/apache/spark/rdd/RDD.scala:

@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/

  • def persist(newLevel: StorageLevel): RDD[T] = {
  • def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel

@mridulm https://github.com/mridulm if you look at this patch, it
explicitly overrides those for SchemaRDD. You can't use this.type there
because the return type is actually a new RDD class (FilteredRDD and so
on).


Reply to this email directly or view it on GitHubhttps://github.com//pull/448/files#r12349982
.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14731/

@pwendell
Copy link
Contributor

pwendell commented May 6, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14738/

@pwendell
Copy link
Contributor

pwendell commented May 6, 2014

@kanzhang hey you'll need to silence some of the binary compatibility checks in project/MimaBuild.scala:

excludeSparkClass("org.apache.spark.graphx.VertexRDD")
excludeSparkClass("org.apache.spark.graphx.EdgeRDD")

@kanzhang
Copy link
Contributor Author

kanzhang commented May 7, 2014

@pwendell thanks for the heads-up. made those changes, let's see how it goes.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14746/

@pwendell
Copy link
Contributor

pwendell commented May 7, 2014

Jenkins, retest this please.

@kanzhang
Copy link
Contributor Author

kanzhang commented May 7, 2014

@pwendell the build didn't seem to start?

@pwendell
Copy link
Contributor

pwendell commented May 7, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14762/

@pwendell
Copy link
Contributor

pwendell commented May 7, 2014

Thanks for updating this. I'm merging it.

@asfgit asfgit closed this in 967635a May 7, 2014
asfgit pushed a commit that referenced this pull request May 7, 2014
…ons...

... that do not change schema

Author: Kan Zhang <[email protected]>

Closes #448 from kanzhang/SPARK-1460 and squashes the following commits:

111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
(cherry picked from commit 967635a)

Signed-off-by: Patrick Wendell <[email protected]>
@kanzhang kanzhang deleted the SPARK-1460 branch May 9, 2014 04:31
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
…ons...

... that do not change schema

Author: Kan Zhang <[email protected]>

Closes apache#448 from kanzhang/SPARK-1460 and squashes the following commits:

111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants