Skip to content

Conversation

nemanjapetr-db
Copy link
Contributor

@nemanjapetr-db nemanjapetr-db commented Jan 3, 2025

What changes were proposed in this pull request?

Instruction for reviewers https://docs.google.com/document/d/1qcEJxqoXcr5cSt6HgIQjWQSqhfkSaVYkoDHsg5oxXp4/edit
Introduction of UnionLoop and UnionLoopRef logical plan classes. Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. Specifically we substitute CTERelationRef with UnionLoopRef, and Union with UnionLoop. We untangle the dead loop in resolving where recursive CTE reference is referring to an unresolved CTE definition, which itself cannot be resolved as one of its descendants is an unresolved CTE reference.

Why are the changes needed?

Support for the recursive CTE.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Proposed changes in this PR are no-op. Tested
./build/sbt "test:testOnly org.apache.spark.sql.SQLQueryTestSuite"
./build/sbt "test:testOnly *PlanParserSuite"

Was this patch authored or co-authored using generative AI tooling?

No

…ocs.google.com/document/d/1qcEJxqoXcr5cSt6HgIQjWQSqhfkSaVYkoDHsg5oxXp4/edit . Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. Introduction of UnionLoop and UnionLoopRef logical plan classes.
@github-actions github-actions bot added the SQL label Jan 3, 2025
cteDef
}
} else {
if (cteDef.recursionAnchor.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this happen? i.e. A non-recursive CTE relation contains recursionAnchor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you believe it is non-recursive, it is within if (cteDEf.recursive) block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I misread the code. Now the question becomes: why do we empty out the recursionAnchor if the CTE def is resolved?

Copy link
Contributor Author

@nemanjapetr-db nemanjapetr-db Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recursionAnchor removed, comment is now moot.

* mapping between the original and reference sequences are symmetric.
*/
private def rewriteConstraints(
reference: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param limit An optional limit that can be pushed down to the node to stop the loop earlier.
*/
case class UnionLoop(
id: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* results.
*/
case class UnionLoopRef(
loopId: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@cloud-fan
Copy link
Contributor

After CTESubstitution, we will create a CTERelationDef whose child is a union query with CTERelationRef referencing itslef. IIUC our goal is to rewrite this recursive CTERelationDef's child to be a UnionLoop with UnionLoopRef. Why do we need to put this anchor field in CTERelationDef?

Copy link
Contributor Author

@nemanjapetr-db nemanjapetr-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take another look.

cteDef
}
} else {
if (cteDef.recursionAnchor.nonEmpty) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you believe it is non-recursive, it is within if (cteDEf.recursive) block?

* mapping between the original and reference sequences are symmetric.
*/
private def rewriteConstraints(
reference: Seq[Attribute],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param limit An optional limit that can be pushed down to the node to stop the loop earlier.
*/
case class UnionLoop(
id: Long,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* results.
*/
case class UnionLoopRef(
loopId: Long,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@nemanjapetr-db nemanjapetr-db changed the title [SPARK-24497][SQL][WIP] Recursive CTE. Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. [SPARK-50739][SQL][WIP] Recursive CTE. Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. Jan 6, 2025
@nemanjapetr-db nemanjapetr-db changed the title [SPARK-50739][SQL][WIP] Recursive CTE. Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. [WIP][SPARK-50739][SQL] Recursive CTE. Changes in ResolveWithCTE.scala to have Jan 6, 2025
Copy link
Contributor Author

@nemanjapetr-db nemanjapetr-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will get back to recursionAnchor variable in the next round, I would like to understand the benefits of removing it since it's removal is quite hairy.

cteDef
}
} else {
if (cteDef.recursionAnchor.nonEmpty) {
Copy link
Contributor Author

@nemanjapetr-db nemanjapetr-db Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recursionAnchor removed, comment is now moot.

…ic of detecting and storing fully and resolved CTE Definitions. Add error cases.
…ecursion achor, instead locate it under CTE child via pattern matching when it is needed. recursiveAnchorResolved() now returns anchor iff it is resolved and can be used to populate and read from a temporary map of CTE definitions.
@nemanjapetr-db
Copy link
Contributor Author

Please take another look.

CTERelationDef does not contain anchor any longer -- when needed it is fetched from its child via pattern matching. Code is greatly simplified and all previous convoluted questions on recursionAnchor are now moot.

I added several exceptions for the unsupported cases of Union under the CTE Definition.

Substitution rules for UnionLoop/Ref are added for 4 cases of Union under CTE Definition.

CTERelationDef change has broken some tests, will work on those now.

@vladimirg-db
Copy link
Contributor

Please update the PR title to have a full sentence.

// Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias->
// UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union.
cteDef.failAnalysis(
errorClass = "INVALID_RECURSIVE_CTE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a new error class to error-conditions.json and a new function to QueryCompilationErrors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's please do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

cteDefs.foreach { cteDef =>
if (cteDef.resolved) {
cteDefMap.put(cteDef.id, cteDef)
val newCTEDefs = cteDefs.map { cteDef =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this map body should have two distinct cases - recursive and not recursive. We can rewrite it like this:

cteDefs.map {
  case cteDef if !cteDef.isRecursive =>
    ...
  case cteDef =>
    ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val newCTEDef = if (cteDef.recursive) {
cteDef.child match {
// Substitutions to UnionLoop and UnionLoopRef.
case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that developers in Catalyst really enjoy one-letter variables in matches, but it does not feel like a good code health.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed, though this code may be changed to address the other feedback.

val newCTEDef = if (cteDef.recursive) {
cteDef.child match {
// Substitutions to UnionLoop and UnionLoopRef.
case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
Copy link
Contributor

@vladimirg-db vladimirg-db Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can introduce an extractor object to reduce complexity here:

object ReplaceUnionWithUnionLoop {
  def unapply(plan: LogicalPlan): Option[UnionLoop] = plan match {
    case union: Union(Seq(anchor, recursion), false, false) =>
      Some(UnionLoop(cteDef.id, anchor, transformRefs(recursion)))
    case distinctUnion: Distinct(Union(Seq(anchor, recursion), false, false)) =>
      Some(UnionLoop(cteDef.id, Distinct(anchor), Except(transformRefs(recursion), UnionLoopRef(cteDef.id, cteDef.output, true), false)))
    case _ =>
      None
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this code seems duplicated, we can use a helper to dedup it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this one in the next round.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change was not possible. Regardless the comment is now moot as we changed the underlying code.

@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
}
}

// Substitute CTERelationRef with UnionLoopRef.
private def transformRefs(plan: LogicalPlan) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we place methods in a top-down order as they are used? More natural for reading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to be more readable and moved below.

@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
}
}

// Substitute CTERelationRef with UnionLoopRef.
private def transformRefs(plan: LogicalPlan) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def transformRefs(plan: LogicalPlan) = {
private def replaceSimpleRefsWithUnionLoopRefs(plan: LogicalPlan) = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
}
}

// Substitute CTERelationRef with UnionLoopRef.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain in the method doc why exactly are we replacing all the simple refs with union refs under a UnionLoop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's expand all the comments in this PR heavily to give a lot of context about the total algorithm to be performed, steps taken, etc.

ref.copy(_resolved = true, output = cteDef.output, isStreaming = cteDef.isStreaming)
} else {
// In the non-recursive case, cteDefMap contains only resolved Definitions.
cteDef.failAnalysis(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It changes the non-recursive behavior - if the def is unresolved now we would throw an error. Also, completely unrelated to the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-recursive case:

We are pulling this def out from cteDefMap, and it is in that map precisely because it is resolved. The map insertion code above is:

if (newCTEDef.resolved) {
cteDefMap.put(newCTEDef.id, newCTEDef)
}

This is a sanity check that is enforcing the invariant.

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this!!

@@ -762,4 +762,15 @@ object QueryPlan extends PredicateHelper {
case e: AnalysisException => append(e.toString)
}
}

/**
* Generate detailed field string with different format based on type of input value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment and method name are a bit generic; could we expand them to mention what type of field we are referring to here, and when this might be used? can we give a brief example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
}
}

// Substitute CTERelationRef with UnionLoopRef.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's expand all the comments in this PR heavily to give a lot of context about the total algorithm to be performed, steps taken, etc.

val newCTEDef = if (cteDef.recursive) {
cteDef.child match {
// Substitutions to UnionLoop and UnionLoopRef.
case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this code seems duplicated, we can use a helper to dedup it

// Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias->
// UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union.
cteDef.failAnalysis(
errorClass = "INVALID_RECURSIVE_CTE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's please do this

@nemanjapetr-db nemanjapetr-db changed the title [WIP][SPARK-50739][SQL] Recursive CTE. Changes in ResolveWithCTE.scala to have [WIP][SPARK-50739][SQL] Recursive CTE. Analyzer changes to unravel and resolve the recursion components. Jan 11, 2025
…handle errors. Simplified pattern matching. Added comments to generateFieldString. Added more comemnts to basicLogicalOperators.scala
Copy link
Contributor Author

@nemanjapetr-db nemanjapetr-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deferred a few pieces of feedback for the next round. Several changes incorporated here including

  • Added error handlers in error-conditions.json and QueryCompilationError
  • Simplified pattern matching.
  • Added comments to generateFieldString.
  • Added more comments to basicLogicalOperators.scala

@@ -762,4 +762,15 @@ object QueryPlan extends PredicateHelper {
case e: AnalysisException => append(e.toString)
}
}

/**
* Generate detailed field string with different format based on type of input value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ref.copy(_resolved = true, output = cteDef.output, isStreaming = cteDef.isStreaming)
} else {
// In the non-recursive case, cteDefMap contains only resolved Definitions.
cteDef.failAnalysis(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-recursive case:

We are pulling this def out from cteDefMap, and it is in that map precisely because it is resolved. The map insertion code above is:

if (newCTEDef.resolved) {
cteDefMap.put(newCTEDef.id, newCTEDef)
}

This is a sanity check that is enforcing the invariant.

// Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias->
// UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union.
cteDef.failAnalysis(
errorClass = "INVALID_RECURSIVE_CTE",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val newCTEDef = if (cteDef.recursive) {
cteDef.child match {
// Substitutions to UnionLoop and UnionLoopRef.
case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this one in the next round.

val newCTEDef = if (cteDef.recursive) {
cteDef.child match {
// Substitutions to UnionLoop and UnionLoopRef.
case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed, though this code may be changed to address the other feedback.

private def transformRefs(plan: LogicalPlan) = {
plan.transformWithPruning(_.containsPattern(CTE)) {
case r: CTERelationRef if r.recursive =>
UnionLoopRef(r.cteId, r.output, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the output should be the output of recursive anchor plan.

Comment on lines 49 to 53
val newCTEDef = cteDef
if (newCTEDef.resolved) {
cteDefMap.put(newCTEDef.id, newCTEDef)
}
newCTEDef
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val newCTEDef = cteDef
if (newCTEDef.resolved) {
cteDefMap.put(newCTEDef.id, newCTEDef)
}
newCTEDef
if (cteDef.resolved) {
cteDefMap.put(cteDef.id, cteDef)
}
cteDef

…hes unsupported CTE UNION placements in ResolveWithCTE.
…o catalyst/plans/logical/cteOperators.scala. Added unittest for the recursive CTE analyzer.
@nemanjapetr-db
Copy link
Contributor Author

Added Milan's analyzer unittest to this PR. Refactored CTE operators out of basicLogicalOperators into its own file. Added more comments.

def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())

val recursionPart = Project(anchor.output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way the tree is decomposed here (using local variables) is very hard to read. Maybe we can actually have a solid tree structure instead of. this decomposition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline I will do it in a follow up PR.

new AnalysisException(
errorClass = "INVALID_RECURSIVE_CTE",
messageParameters = Map(
"error" -> error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have this parameter any more.

@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0! I'll create a followup PR shortly to do some cleanup

@cloud-fan cloud-fan changed the title [WIP][SPARK-50739][SQL] Recursive CTE. Analyzer changes to unravel and resolve the recursion components. [SPARK-50739][SQL] Recursive CTE. Analyzer changes to unravel and resolve the recursion components. Jan 17, 2025
@cloud-fan cloud-fan closed this in 3b114af Jan 17, 2025
cloud-fan pushed a commit that referenced this pull request Jan 17, 2025
…olve the recursion components

### What changes were proposed in this pull request?
Instruction for reviewers https://docs.google.com/document/d/1qcEJxqoXcr5cSt6HgIQjWQSqhfkSaVYkoDHsg5oxXp4/edit
Introduction of UnionLoop and UnionLoopRef logical plan classes. Changes in ResolveWithCTE.scala to have the analyzer grok recursive anchors. Specifically we substitute CTERelationRef with UnionLoopRef, and Union with UnionLoop. We untangle the dead loop in resolving where recursive CTE reference is referring to an unresolved CTE definition, which itself cannot be resolved as one of its descendants is an unresolved CTE reference.

### Why are the changes needed?
Support for the recursive CTE.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Proposed changes in this PR are no-op. Tested
./build/sbt "test:testOnly org.apache.spark.sql.SQLQueryTestSuite"
./build/sbt "test:testOnly *PlanParserSuite"

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49351 from nemanjapetr-db/nemanjapetr-db/rcte3.

Authored-by: Nemanja Petrovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3b114af)
Signed-off-by: Wenchen Fan <[email protected]>
@nemanjapetr-db nemanjapetr-db deleted the nemanjapetr-db/rcte3 branch January 17, 2025 16:53
dongjoon-hyun pushed a commit that referenced this pull request Jan 18, 2025
### What changes were proposed in this pull request?

A followup of #49351 to simplify the test via dsl.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #49557 from cloud-fan/clean.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 18, 2025
### What changes were proposed in this pull request?

A followup of #49351 to simplify the test via dsl.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #49557 from cloud-fan/clean.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 66dd7dd)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants