Skip to content

[SPARK-2710] [SQL] Build SchemaRDD from a JdbcRDD with MetaData #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e
* @param getConnection a function that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* The query must contain two ? placeholders for parameters used to partition the results,
* when you wan to use more than one partitions.
* E.g. "select title, author from books where ? <= id and id <= ?"
* If numPartitions is set to exactly 1, the query do not need to contain any ? placeholder.
* @param lowerBound the minimum value of the first placeholder
* @param upperBound the maximum value of the second placeholder
* The lower and upper bounds are inclusive.
* If query do not contain any ? placeholder, lowerBound and upperBound can be set to any value.
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
* If query do not contain any ? placeholder, numPartitions must be set to exactly 1.
* @param mapRow a function from a ResultSet to a single row of the desired result type(s).
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
Expand All @@ -57,6 +61,8 @@ class JdbcRDD[T: ClassTag](
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging {

private var schema: Seq[(String, Int, Boolean)] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the schema stuff to JdbcResultSetRDD? We'd better keep the Spark core clean and same implementation pattern with the other Core RDDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, i tried to do like you said before, but there is no public method or attribute to get ResultSet or Statement from this JdbcRDD in spark core, so in JdbcResultSetRDD i have no idea how can we get the metadata from JdbcRDD... otherwise we do something like jdbcRDD.head then we can get the metadata from first row, but it may execute the whole query at plan phase.


override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end
val length = 1 + upperBound - lowerBound
Expand All @@ -67,6 +73,32 @@ class JdbcRDD[T: ClassTag](
}).toArray
}

def getSchema: Seq[(String, Int, Boolean)] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here i tried to return a java.sql.ResultSetMetaData object, then build the Seq[(String, Int, Boolean)] for schemaRDD in Spark SQL scope, but when i run this SchemaRDD, i got "java.io.NotSerializableException: org.postgresql.jdbc4.Jdbc4ResultSetMetaData"

so i let this method return a Seq[(String, Int, Boolean)], and in Spark SQL scope, map this Seq[(String, Int, Boolean)] to Seq[StructField]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this as a comment here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also make this private[spark].

if (null != schema) {
return schema
}

val conn = getConnection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this connection guaranteed to get closed? It won't benefit from the addOnCompleteCallback below, for instance.

val stmt = conn.prepareStatement(sql)
val metadata = stmt.getMetaData
try {
if (null != stmt && ! stmt.isClosed()) {
stmt.close()
}
} catch {
case e: Exception => logWarning("Exception closing statement", e)
}
schema = Seq[(String, Int, Boolean)]()
for(i <- 1 to metadata.getColumnCount) {
schema :+= (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any thread safety concerns regarding mutating schema here?

metadata.getColumnName(i),
metadata.getColumnType(i),
metadata.isNullable(i) == java.sql.ResultSetMetaData.columnNullable
)
}
schema
}

override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
Expand All @@ -81,8 +113,14 @@ class JdbcRDD[T: ClassTag](
logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
}

stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val parameterCount = stmt.getParameterMetaData.getParameterCount
if (parameterCount > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this as a new PR? Which seems like an enhancement / bug fixing for JdbcRDD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am afraid they do not think it is a problem, in the original comment of JdbcRDD:

 * @param sql the text of the query.
 *   The query must contain two ? placeholders for parameters used to partition the results.
 *   E.g. "select title, author from books where ? <= id and id <= ?"

but i believe many users just want to get the whole table out of RDBMS simply, and then do some calculation in Spark's magic world... how many partitions will be created is no matter, because in the normal use case, the tables stored in RDBMS are small, therefore these two ? placeholders for partitioning is not always necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I was annoyed when I had to work around this. As long as its backwards compatible I'm okay including it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that there's anything wrong with backwards compatible fixes/enhancements, but a few things I noticed here:

  1. If it's a sufficiently small table that a user is only using 1 partition, why not encourage them to query it from the driver and broadcast it?
  2. As it stands, it looks like you allow 0, 1, 2, or more ? placeholders, but the doc comment change only describes the 0 or 2 case.

stmt.setLong(1, part.lower)
}
if (parameterCount > 1) {
stmt.setLong(2, part.upper)
}

val rs = stmt.executeQuery()

override def getNext: T = {
Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
53 changes: 52 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
Expand All @@ -35,8 +36,10 @@ import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
import org.apache.spark.sql.json._
import org.apache.spark.sql.jdbc._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.{Logging, SparkContext}
import java.sql.{DriverManager, ResultSet}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -204,6 +207,54 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}

/**
* Loads from JDBC, returning the ResultSet as a [[SchemaRDD]].
* It gets MetaData from ResultSet of PreparedStatement to determine the schema.
*
* @group userf
*/
def jdbcResultSet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 1.2 release we are going to be focusing on adding more external datasources. As part of this we are trying to change the way we add them to avoid SQLContext getting to large. What do you think about adding an object, org.apache.spark.sql.jdbc.JDBC that has these methods instead of adding them here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we did for Spark Streaming is that each of the external connectors has a module called XUtils with static utility functions for creating things:

object KafkaUtils {
  def createKafkaStream(streamingContext: StreamingContext)
}

For this reason it might be good to call this JDBCUtils in similar fashion.

connectString: String,
sql: String): SchemaRDD = {
jdbcResultSet(connectString, "", "", sql, 0, 0, 1)
}

def jdbcResultSet(
connectString: String,
username: String,
password: String,
sql: String): SchemaRDD = {
jdbcResultSet(connectString, username, password, sql, 0, 0, 1)
}

def jdbcResultSet(
connectString: String,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): SchemaRDD = {
jdbcResultSet(connectString, "", "", sql, lowerBound, upperBound, numPartitions)
}

def jdbcResultSet(
connectString: String,
username: String,
password: String,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): SchemaRDD = {
val resultSetRDD = new JdbcRDD(
sparkContext,
() => { DriverManager.getConnection(connectString, username, password) },
sql, lowerBound, upperBound, numPartitions,
(r: ResultSet) => r
)
val appliedSchema = JdbcResultSetRDD.inferSchema(resultSetRDD)
val rowRDD = JdbcResultSetRDD.jdbcResultSetToRow(resultSetRDD, appliedSchema)
applySchema(rowRDD, appliedSchema)
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
Expand Down Expand Up @@ -411,7 +462,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }

def simpleString: String =
def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.sql.ResultSet

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.Logging

private[sql] object JdbcResultSetRDD extends Logging {

private[sql] def inferSchema(
jdbcResultSet: JdbcRDD[ResultSet]): StructType = {
StructType(createSchema(jdbcResultSet.getSchema))
}

private def createSchema(metaSchema: Seq[(String, Int, Boolean)]): Seq[StructField] = {
metaSchema.map(e => StructField(e._1, JdbcTypes.toPrimitiveDataType(e._2), e._3))
}

private[sql] def jdbcResultSetToRow(
jdbcResultSet: JdbcRDD[ResultSet],
schema: StructType) : RDD[Row] = {
val row = new GenericMutableRow(schema.fields.length)
jdbcResultSet.map(asRow(_, row, schema.fields))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are are going to reuse the row object (which is a good idea), I'd use mapPartitions instead and create the object inside of the closure.

}

private def asRow(rs: ResultSet, row: GenericMutableRow, schemaFields: Seq[StructField]): Row = {
var i = 0
while (i < schemaFields.length) {
schemaFields(i).dataType match {
case StringType => row.update(i, rs.getString(i + 1))
case DecimalType => row.update(i, rs.getBigDecimal(i + 1))
case BooleanType => row.update(i, rs.getBoolean(i + 1))
case ByteType => row.update(i, rs.getByte(i + 1))
case ShortType => row.update(i, rs.getShort(i + 1))
case IntegerType => row.update(i, rs.getInt(i + 1))
case LongType => row.update(i, rs.getLong(i + 1))
case FloatType => row.update(i, rs.getFloat(i + 1))
case DoubleType => row.update(i, rs.getDouble(i + 1))
case BinaryType => row.update(i, rs.getBytes(i + 1))
case TimestampType => row.update(i, rs.getTimestamp(i + 1))
case _ => sys.error(
s"Unsupported jdbc datatype")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to print what the unsupported type is. Also, try to wrap at the highest syntatic level, for example:

case unsupportedType =>
  sys.error(s"Unsupported jdbc datatype: $unsupportedType")

(Though actually in this case I think it'll all fit on one line).

}
if (rs.wasNull) row.update(i, null)
i += 1
}

row
}
}
77 changes: 77 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcTypes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.types._

private[sql] object JdbcTypes extends Logging {

/**
* More about JDBC types mapped to Java types:
* http://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html#1051555
*
* Compatibility of ResultSet getter Methods defined in JDBC spec:
* http://download.oracle.com/otn-pub/jcp/jdbc-4_1-mrel-spec/jdbc4.1-fr-spec.pdf
* page 211
*/
def toPrimitiveDataType(jdbcType: Int): DataType =
jdbcType match {
case java.sql.Types.LONGVARCHAR
| java.sql.Types.VARCHAR
| java.sql.Types.CHAR => StringType
case java.sql.Types.NUMERIC
| java.sql.Types.DECIMAL => DecimalType
case java.sql.Types.BIT => BooleanType
case java.sql.Types.TINYINT => ByteType
case java.sql.Types.SMALLINT => ShortType
case java.sql.Types.INTEGER => IntegerType
case java.sql.Types.BIGINT => LongType
case java.sql.Types.REAL => FloatType
case java.sql.Types.FLOAT
| java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.LONGVARBINARY
| java.sql.Types.VARBINARY
| java.sql.Types.BINARY => BinaryType
// Timestamp's getter should also be able to get DATE and TIME according to JDBC spec
case java.sql.Types.TIMESTAMP
| java.sql.Types.DATE
| java.sql.Types.TIME => TimestampType

// TODO: CLOB only works with getClob or getAscIIStream
// case java.sql.Types.CLOB

// TODO: BLOB only works with getBlob or getBinaryStream
// case java.sql.Types.BLOB

// TODO: nested types
// case java.sql.Types.ARRAY => ArrayType
// case java.sql.Types.STRUCT => StructType

// TODO: unsupported types
// case java.sql.Types.DISTINCT
// case java.sql.Types.REF

// TODO: more about JAVA_OBJECT:
// http://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html#1038181
// case java.sql.Types.JAVA_OBJECT => BinaryType

case _ => sys.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: include the type that isn't supported.

s"Unsupported jdbc datatype")
}
}
Loading