diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 44eceb0b372e6..0305f31ab6766 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -64,9 +64,17 @@ class Analyzer(catalog: Catalog, UnresolvedHavingClauseAttributes :: TrimGroupingAliases :: typeCoercionRules ++ - extendedResolutionRules : _*) + extendedResolutionRules : _*), + Batch("SetAnalyzed", Once, SetLogicalPlanAnalyzed) ) + object SetLogicalPlanAnalyzed extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.analyzed = true + plan + } + } + /** * Removes no-op Alias expressions from the plan. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index b01a61d7bf8d6..cb26c2af02e9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -29,6 +29,15 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { self: Product => + protected[sql] var analyzed: Boolean = false + + def originalPlan: LogicalPlan = this transform { + case p if p.analyzed => + p._origPlan.getOrElse(p) + } + + protected[sql] var _origPlan: Option[LogicalPlan] = None + /** * Computes [[Statistics]] for this plan. The default implementation assumes the output * cardinality is the product of of all child plan's cardinality, i.e. applies in the case diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 423ef3912bc89..cf04977170a0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -149,6 +149,8 @@ class DataFrame private[sql]( queryExecution.analyzed } + @transient val originalPlan: LogicalPlan = logicalPlan.originalPlan + /** * An implicit conversion function internal to this class for us to avoid doing * "new DataFrame(...)" everywhere. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b8100782ec937..f8dc66bcfee32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1107,7 +1107,14 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] class QueryExecution(val logical: LogicalPlan) { def assertAnalyzed(): Unit = checkAnalysis(analyzed) - lazy val analyzed: LogicalPlan = analyzer(logical) + lazy val analyzed: LogicalPlan = if (!logical.analyzed) { + val plan = analyzer(logical) + plan._origPlan = Some(logical) + plan + } else { + logical + } + lazy val withCachedData: LogicalPlan = { assertAnalyzed() cacheManager.useCachedData(analyzed)