Skip to content

Commit ac0174e

Browse files
gengliangwanggatorsmile
authored andcommitted
[SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable
## What changes were proposed in this pull request? In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro . As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable. This PR also improve the error message when data source of Avro/Kafka is not found. ## How was this patch tested? Unit test Closes #22133 from gengliangwang/configurable_avro_mapping. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent 6c5cb85 commit ac0174e

File tree

4 files changed

+52
-3
lines changed

4 files changed

+52
-3
lines changed

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
7777
}
7878

7979
test("resolve avro data source") {
80-
Seq("avro", "com.databricks.spark.avro").foreach { provider =>
80+
val databricksAvro = "com.databricks.spark.avro"
81+
// By default the backward compatibility for com.databricks.spark.avro is enabled.
82+
Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider =>
8183
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
8284
classOf[org.apache.spark.sql.avro.AvroFileFormat])
8385
}
86+
87+
withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") {
88+
val message = intercept[AnalysisException] {
89+
DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf)
90+
}.getMessage
91+
assert(message.contains(s"Failed to find data source: $databricksAvro"))
92+
}
8493
}
8594

8695
test("reading from multiple paths") {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,6 +1469,13 @@ object SQLConf {
14691469
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
14701470
.createWithDefault(Deflater.DEFAULT_COMPRESSION)
14711471

1472+
val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED =
1473+
buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled")
1474+
.doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " +
1475+
"to the built-in but external Avro data source module for backward compatibility.")
1476+
.booleanConf
1477+
.createWithDefault(true)
1478+
14721479
val LEGACY_SETOPS_PRECEDENCE_ENABLED =
14731480
buildConf("spark.sql.legacy.setopsPrecedence.enabled")
14741481
.internal()
@@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging {
18811888

18821889
def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
18831890

1891+
def replaceDatabricksSparkAvroEnabled: Boolean =
1892+
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
1893+
18841894
def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
18851895

18861896
def parallelFileListingInStatsComputation: Boolean =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,6 @@ object DataSource extends Logging {
571571
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
572572
val socket = classOf[TextSocketSourceProvider].getCanonicalName
573573
val rate = classOf[RateStreamProvider].getCanonicalName
574-
val avro = "org.apache.spark.sql.avro.AvroFileFormat"
575574

576575
Map(
577576
"org.apache.spark.sql.jdbc" -> jdbc,
@@ -593,7 +592,6 @@ object DataSource extends Logging {
593592
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
594593
"org.apache.spark.ml.source.libsvm" -> libsvm,
595594
"com.databricks.spark.csv" -> csv,
596-
"com.databricks.spark.avro" -> avro,
597595
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
598596
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
599597
)
@@ -616,6 +614,8 @@ object DataSource extends Logging {
616614
case name if name.equalsIgnoreCase("orc") &&
617615
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
618616
"org.apache.spark.sql.hive.orc.OrcFileFormat"
617+
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
618+
"org.apache.spark.sql.avro.AvroFileFormat"
619619
case name => name
620620
}
621621
val provider2 = s"$provider1.DefaultSource"
@@ -637,6 +637,18 @@ object DataSource extends Logging {
637637
"Hive built-in ORC data source must be used with Hive support enabled. " +
638638
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
639639
"'native'")
640+
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
641+
provider1 == "com.databricks.spark.avro" ||
642+
provider1 == "org.apache.spark.sql.avro") {
643+
throw new AnalysisException(
644+
s"Failed to find data source: $provider1. Avro is built-in but external data " +
645+
"source module since Spark 2.4. Please deploy the application as per " +
646+
"the deployment section of \"Apache Avro Data Source Guide\".")
647+
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
648+
throw new AnalysisException(
649+
s"Failed to find data source: $provider1. Please deploy the application as " +
650+
"per the deployment section of " +
651+
"\"Structured Streaming + Kafka Integration Guide\".")
640652
} else {
641653
throw new ClassNotFoundException(
642654
s"Failed to find data source: $provider1. Please find packages at " +

sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
7676
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
7777
}
7878

79+
test("avro: show deploy guide for loading the external avro module") {
80+
Seq("avro", "org.apache.spark.sql.avro").foreach { provider =>
81+
val message = intercept[AnalysisException] {
82+
getProvidingClass(provider)
83+
}.getMessage
84+
assert(message.contains(s"Failed to find data source: $provider"))
85+
assert(message.contains("Please deploy the application as per the deployment section of"))
86+
}
87+
}
88+
89+
test("kafka: show deploy guide for loading the external kafka module") {
90+
val message = intercept[AnalysisException] {
91+
getProvidingClass("kafka")
92+
}.getMessage
93+
assert(message.contains("Failed to find data source: kafka"))
94+
assert(message.contains("Please deploy the application as per the deployment section of"))
95+
}
96+
7997
test("error message for unknown data sources") {
8098
val error = intercept[ClassNotFoundException] {
8199
getProvidingClass("asfdwefasdfasdf")

0 commit comments

Comments
 (0)