Skip to content

Commit 5046e81

Browse files
committed
NETFLIX-BUILD: Remove Iceberg special case in createDataSourceTables.
1 parent 6439bd1 commit 5046e81

File tree

1 file changed

+46
-118
lines changed

1 file changed

+46
-118
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 46 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,14 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import com.netflix.iceberg.PartitionSpec
21-
import com.netflix.iceberg.metacat.MetacatTables
22-
import com.netflix.iceberg.spark.SparkSchemaUtil
2320
import org.apache.hadoop.fs.Path
2421

25-
import org.apache.spark.SparkException
2622
import org.apache.spark.sql._
2723
import org.apache.spark.sql.catalyst.catalog._
2824
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2925
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3026
import org.apache.spark.sql.execution.datasources._
3127
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.sources.v2.DataSourceV2
3328

3429
/**
3530
* A command used to create a data source table.
@@ -47,8 +42,6 @@ import org.apache.spark.sql.sources.v2.DataSourceV2
4742
case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
4843
extends RunnableCommand {
4944

50-
import CreateDataSourceTableCommand._
51-
5245
override def run(sparkSession: SparkSession): Seq[Row] = {
5346
assert(table.tableType != CatalogTableType.VIEW)
5447
assert(table.provider.isDefined)
@@ -62,128 +55,63 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
6255
}
6356
}
6457

65-
val cls = DataSource.lookupDataSource(table.provider.get)
66-
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
67-
if (table.provider.get == "iceberg") {
68-
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
69-
val hiveEnv = hadoopConf.get("spark.sql.hive.env", "prod")
70-
val catalog = conf.getConfString("spark.sql.metacat.write.catalog", hiveEnv + "hive")
71-
val db = table.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
72-
val tableName = table.identifier.table
73-
74-
val tables = new MetacatTables(
75-
hadoopConf, sparkSession.sparkContext.applicationId, catalog)
76-
77-
val nonIdentityPartitions = table.partitionColumnNames.filter {
78-
case Year(name) if table.schema.fieldNames.contains(name) => true
79-
case Month(name) if table.schema.fieldNames.contains(name) => true
80-
case Day(name) if table.schema.fieldNames.contains(name) => true
81-
case Hour(name) if table.schema.fieldNames.contains(name) => true
82-
case Bucket(name, _) if table.schema.fieldNames.contains(name) => true
83-
case Truncate(name, _) if table.schema.fieldNames.contains(name) => true
84-
case _ => false
85-
}.toSet
86-
87-
// filter our the partition columns, except for identity partitions
88-
val baseSchema = table.schema.toAttributes
89-
.filterNot(a => nonIdentityPartitions.contains(a.name))
90-
.toStructType
91-
92-
val schema = SparkSchemaUtil.convert(baseSchema)
93-
val specBuilder = PartitionSpec.builderFor(schema)
94-
table.partitionColumnNames.foreach {
95-
case Year(name) if schema.findField(name) != null =>
96-
specBuilder.year(name)
97-
case Month(name) if schema.findField(name) != null =>
98-
specBuilder.month(name)
99-
case Day(name) if schema.findField(name) != null =>
100-
specBuilder.day(name)
101-
case Hour(name) if schema.findField(name) != null =>
102-
specBuilder.hour(name)
103-
case Bucket(name, numBuckets) if schema.findField(name) != null =>
104-
specBuilder.bucket(name, numBuckets.toInt)
105-
case Truncate(name, width) if schema.findField(name) != null =>
106-
specBuilder.truncate(name, width.toInt)
107-
case Identity(name) if schema.findField(name) != null =>
108-
specBuilder.identity(name)
109-
case name: String =>
110-
specBuilder.identity(name)
111-
case other =>
112-
throw new SparkException(s"Cannot determine partition type: $other")
58+
// Create the relation to validate the arguments before writing the metadata to the
59+
// metastore, and infer the table schema and partition if users didn't specify schema in
60+
// CREATE TABLE.
61+
val pathOption = table.storage.locationUri.map("path" -> _)
62+
// Fill in some default table options from the session conf
63+
val tableWithDefaultOptions = table.copy(
64+
identifier = table.identifier.copy(
65+
database = Some(
66+
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
67+
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
68+
val dataSource: BaseRelation =
69+
DataSource(
70+
sparkSession = sparkSession,
71+
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
72+
partitionColumns = table.partitionColumnNames,
73+
className = table.provider.get,
74+
bucketSpec = table.bucketSpec,
75+
options = table.storage.properties ++ pathOption,
76+
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
77+
78+
dataSource match {
79+
case fs: HadoopFsRelation =>
80+
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
81+
throw new AnalysisException(
82+
"Cannot create a file-based external data source table without path")
11383
}
84+
case _ =>
85+
}
11486

115-
tables.create(schema, specBuilder.build, db, tableName)
116-
}
117-
87+
val partitionColumnNames = if (table.schema.nonEmpty) {
88+
table.partitionColumnNames
11889
} else {
119-
// Create the relation to validate the arguments before writing the metadata to the
120-
// metastore, and infer the table schema and partition if users didn't specify schema in
121-
// CREATE TABLE.
122-
val pathOption = table.storage.locationUri.map("path" -> _)
123-
// Fill in some default table options from the session conf
124-
val tableWithDefaultOptions = table.copy(
125-
identifier = table.identifier.copy(
126-
database = Some(
127-
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
128-
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
129-
val dataSource: BaseRelation =
130-
DataSource(
131-
sparkSession = sparkSession,
132-
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
133-
partitionColumns = table.partitionColumnNames,
134-
className = table.provider.get,
135-
bucketSpec = table.bucketSpec,
136-
options = table.storage.properties ++ pathOption,
137-
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
138-
90+
// This is guaranteed in `PreprocessDDL`.
91+
assert(table.partitionColumnNames.isEmpty)
13992
dataSource match {
140-
case fs: HadoopFsRelation =>
141-
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
142-
throw new AnalysisException(
143-
"Cannot create a file-based external data source table without path")
144-
}
145-
case _ =>
146-
}
147-
148-
val partitionColumnNames = if (table.schema.nonEmpty) {
149-
table.partitionColumnNames
150-
} else {
151-
// This is guaranteed in `PreprocessDDL`.
152-
assert(table.partitionColumnNames.isEmpty)
153-
dataSource match {
154-
case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq
155-
case _ => Nil
156-
}
93+
case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq
94+
case _ => Nil
15795
}
158-
159-
val newTable = table.copy(
160-
schema = dataSource.schema,
161-
partitionColumnNames = partitionColumnNames,
162-
// If metastore partition management for file source tables is enabled, we start off with
163-
// partition provider hive, but no partitions in the metastore. The user has to call
164-
// `msck repair table` to populate the table partitions.
165-
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
166-
sparkSession.sessionState.conf.manageFilesourcePartitions)
167-
// We will return Nil or throw exception at the beginning if the table already exists, so
168-
// when we reach here, the table should not exist and we should set `ignoreIfExists` to
169-
// false.
170-
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
17196
}
17297

98+
val newTable = table.copy(
99+
schema = dataSource.schema,
100+
partitionColumnNames = partitionColumnNames,
101+
// If metastore partition management for file source tables is enabled, we start off with
102+
// partition provider hive, but no partitions in the metastore. The user has to call
103+
// `msck repair table` to populate the table partitions.
104+
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
105+
sparkSession.sessionState.conf.manageFilesourcePartitions)
106+
// We will return Nil or throw exception at the beginning if the table already exists, so
107+
// when we reach here, the table should not exist and we should set `ignoreIfExists` to
108+
// false.
109+
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
110+
173111
Seq.empty[Row]
174112
}
175113
}
176114

177-
object CreateDataSourceTableCommand {
178-
lazy val Identity = "^(\\w+)_identity$".r
179-
lazy val Year = "^(\\w+)_year$".r
180-
lazy val Month = "^(\\w+)_month$".r
181-
lazy val Day = "^(\\w+)_day$".r
182-
lazy val Hour = "^(\\w+)_hour$".r
183-
lazy val Bucket = "^(\\w+)_bucket_(\\d+)$".r
184-
lazy val Truncate = "^(\\w+)_truncate_(\\d+)$".r
185-
}
186-
187115
/**
188116
* A command used to create a data source table using the result of a query.
189117
*

0 commit comments

Comments
 (0)