-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-10339] [SPARK-10334] [SPARK-10301] [SQL]Partitioned table scan can OOM driver and throw a better error message when users need to enable parquet schema merging #8515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…d of create one Filter/Project for every partition.
"A Parquet file's schema has different number of fields with the table schema. " + | ||
"Please enable schema merging by setting \"mergeSchema\" to true when load " + | ||
"a Parquet dataset or set spark.sql.parquet.mergeSchema to tru in SQLConf.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liancheng @marmbrus I add the check to make sure the number of fields of a parquet file's struct have the same number of the corresponding struct in the table schema. If this check fails, we will ask users to enable mergeSchema.
…ields with the table schema.
test this please |
"Please enable schema merging by setting \"mergeSchema\" to true when load " + | ||
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note: this is a quick fix version of #8509.
Test build #41770 has finished for PR 8515 at commit
|
Test build #41771 has finished for PR 8515 at commit
|
Would be nice to have the following test case in test("SPARK-10334 Projections and filters should be kept in physical plan") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.executedPlan
assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
}
} And probably add Otherwise LGTM. Verified locally that filter push-down and column pruning both work properly. |
Test build #41786 has finished for PR 8515 at commit
|
Thanks, merging to master and branch-1.5. |
…n can OOM driver and throw a better error message when users need to enable parquet schema merging This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables. https://issues.apache.org/jira/browse/SPARK-10339 https://issues.apache.org/jira/browse/SPARK-10334 Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do. Author: Yin Huai <[email protected]> Closes #8515 from yhuai/partitionedTableScan. (cherry picked from commit 097a7e3) Signed-off-by: Michael Armbrust <[email protected]>
…or nested structs We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR #8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR #8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428. This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR #8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR #8515 depends on it. This should be safe since #8454 only touches testing code. Author: Cheng Lian <[email protected]> Closes #8583 from liancheng/spark-10301/for-1.5.
…or nested structs We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR apache#8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR apache#8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428. This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR apache#8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR apache#8515 depends on it. This should be safe since apache#8454 only touches testing code. Author: Cheng Lian <[email protected]> Closes apache#8583 from liancheng/spark-10301/for-1.5. (cherry picked from commit fca16c5) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables.
https://issues.apache.org/jira/browse/SPARK-10339
https://issues.apache.org/jira/browse/SPARK-10334
Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do.