Skip to content

Commit 0e88661

Browse files
committed
[SPARK-4050][SQL] Fix caching of temporary tables with projections.
Previously cached data was found by `sameResult` plan matching on optimized plans. This technique however fails to locate the cached data when a temporary table with a projection is queried with a further reduced projection. The failure is due to the fact that optimization will collapse the projections, producing a plan that no longer produces the sameResult as the cached data (though the cached data still subsumes the desired data). For example consider the following previously failing test case. ```scala sql("CACHE TABLE tempTable AS SELECT key FROM testData") assertCached(sql("SELECT COUNT(*) FROM tempTable")) ``` In this PR I change the matching to occur after analysis instead of optimization, so that in the case of temporary tables, the plans will always match. I think this should work generally, however, this error does raise questions about the need to do more thorough subsumption checking when locating cached data. Another question is what sort of semantics we want to provide when uncaching data from temporary tables. For example consider the following sequence of commands: ```scala testData.select('key).registerTempTable("tempTable1") testData.select('key).registerTempTable("tempTable2") cacheTable("tempTable1") // This obviously works. assertCached(sql("SELECT COUNT(*) FROM tempTable1")) // It seems good that this works ... assertCached(sql("SELECT COUNT(*) FROM tempTable2")) // ... but is this valid? uncacheTable("tempTable2") // Should this still be cached? assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0) ``` Author: Michael Armbrust <[email protected]> Closes apache#2912 from marmbrus/cachingBug and squashes the following commits: 9c822d4 [Michael Armbrust] remove commented out code 5c72fb7 [Michael Armbrust] Add a test case / question about uncaching semantics. 63a23e4 [Michael Armbrust] Perform caching on analyzed instead of optimized plan. 03f1cfe [Michael Armbrust] Clean-up / add tests to SameResult suite.
1 parent d60a9d4 commit 0e88661

File tree

4 files changed

+48
-12
lines changed

4 files changed

+48
-12
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2626
import org.apache.spark.sql.catalyst.util._
2727

2828
/**
29-
* Provides helper methods for comparing plans.
29+
* Tests for the sameResult function of [[LogicalPlan]].
3030
*/
3131
class SameResultSuite extends FunSuite {
3232
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -52,11 +52,15 @@ class SameResultSuite extends FunSuite {
5252
assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b))
5353
assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a))
5454

55-
assertSameResult(testRelation, testRelation2.select('a), false)
56-
assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false)
55+
assertSameResult(testRelation, testRelation2.select('a), result = false)
56+
assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), result = false)
5757
}
5858

5959
test("filters") {
6060
assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b))
6161
}
62+
63+
test("sorts") {
64+
assertSameResult(testRelation.orderBy('a.asc), testRelation2.orderBy('a.asc))
65+
}
6266
}

sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private[sql] trait CacheManager {
8282
private[sql] def cacheQuery(
8383
query: SchemaRDD,
8484
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
85-
val planToCache = query.queryExecution.optimizedPlan
85+
val planToCache = query.queryExecution.analyzed
8686
if (lookupCachedData(planToCache).nonEmpty) {
8787
logWarning("Asked to cache already cached data.")
8888
} else {
@@ -96,8 +96,8 @@ private[sql] trait CacheManager {
9696

9797
/** Removes the data for the given SchemaRDD from the cache */
9898
private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
99-
val planToCache = query.queryExecution.optimizedPlan
100-
val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
99+
val planToCache = query.queryExecution.analyzed
100+
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
101101
require(dataIndex >= 0, s"Table $query is not cached.")
102102
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
103103
cachedData.remove(dataIndex)
@@ -106,12 +106,12 @@ private[sql] trait CacheManager {
106106

107107
/** Optionally returns cached data for the given SchemaRDD */
108108
private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
109-
lookupCachedData(query.queryExecution.optimizedPlan)
109+
lookupCachedData(query.queryExecution.analyzed)
110110
}
111111

112112
/** Optionally returns cached data for the given LogicalPlan. */
113113
private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
114-
cachedData.find(_.plan.sameResult(plan))
114+
cachedData.find(cd => plan.sameResult(cd.plan))
115115
}
116116

117117
/** Replaces segments of the given logical plan with cached versions where possible. */

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
374374
def logical: LogicalPlan
375375

376376
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
377-
lazy val optimizedPlan = optimizer(analyzed)
378-
lazy val withCachedData = useCachedData(optimizedPlan)
377+
lazy val withCachedData = useCachedData(analyzed)
378+
lazy val optimizedPlan = optimizer(withCachedData)
379379

380380
// TODO: Don't just pick the first one...
381381
lazy val sparkPlan = {
382382
SparkPlan.currentContext.set(self)
383-
planner(withCachedData).next()
383+
planner(optimizedPlan).next()
384384
}
385385
// executedPlan should not be used to initialize any SparkPlan. It should be
386386
// only used for execution.

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,47 @@ class CachedTableSuite extends QueryTest {
5353
sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
5454
}
5555

56+
test("cache temp table") {
57+
testData.select('key).registerTempTable("tempTable")
58+
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
59+
cacheTable("tempTable")
60+
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
61+
uncacheTable("tempTable")
62+
}
63+
64+
test("cache table as select") {
65+
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
66+
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
67+
uncacheTable("tempTable")
68+
}
69+
70+
test("uncaching temp table") {
71+
testData.select('key).registerTempTable("tempTable1")
72+
testData.select('key).registerTempTable("tempTable2")
73+
cacheTable("tempTable1")
74+
75+
assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
76+
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
77+
78+
// Is this valid?
79+
uncacheTable("tempTable2")
80+
81+
// Should this be cached?
82+
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
83+
}
84+
5685
test("too big for memory") {
5786
val data = "*" * 10000
5887
sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData")
5988
table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
6089
assert(table("bigData").count() === 200000L)
61-
table("bigData").unpersist()
90+
table("bigData").unpersist(blocking = true)
6291
}
6392

6493
test("calling .cache() should use in-memory columnar caching") {
6594
table("testData").cache()
6695
assertCached(table("testData"))
96+
table("testData").unpersist(blocking = true)
6797
}
6898

6999
test("calling .unpersist() should drop in-memory columnar cache") {
@@ -108,6 +138,8 @@ class CachedTableSuite extends QueryTest {
108138
case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
109139
}.size
110140
}
141+
142+
uncacheTable("testData")
111143
}
112144

113145
test("read from cached table and uncache") {

0 commit comments

Comments
 (0)