Skip to content

Column name being parsed instead of Column Value when querying with Spark using aws-wrapper #1370

@cyonglun

Description

@cyonglun

Describe the bug

Usage of aws-wrapper in SparkSession seems to have issues during read.
Reverting to usage of non aws-wrapper (e.g. com. mysql. cj. jdbc. Driver) and retrieving the IAM auth token manually allowed the read to succeed.

Expected Behavior

SparkSession read should be able to properly encode the column values based on the stated class type.

What plugins are used? What other connection properties were set?

iam

Current Behavior

A simple MySQL query of select id, row_number() over(order by (select null)) as rno from table_name; failed to execute. This is despite the datatype of the ResultSet being correct (rno is DECIMAL)

SparkSqlReader: Schema JSON: {
  "type" : "struct",
  "fields" : [ {
    "name" : "rno",
    "type" : "decimal(38,0)",
    "nullable" : true,
    "metadata" : {
      "isSigned" : true,
      "scale" : 0
    }
  }]
}

The exception seems to indicate that the column name rno is being parsed instead of the column value:

WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) ([2406:da18:7a9:9701:dcfb:3e3c:9391:425] executor 12): java.sql.SQLDataException: Cannot determine value type from string 'rno'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:106)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:81)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:55)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:65)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:88)
	at com.mysql.cj.jdbc.result.ResultSetImpl.getBigDecimal(ResultSetImpl.java:670)
	at software.amazon.jdbc.wrapper.ResultSetWrapper.lambda$getBigDecimal$15(ResultSetWrapper.java:235)
	at software.amazon.jdbc.plugin.DefaultConnectionPlugin.execute(DefaultConnectionPlugin.java:128)
	at software.amazon.jdbc.ConnectionPluginManager.lambda$execute$5(ConnectionPluginManager.java:348)
	at software.amazon.jdbc.ConnectionPluginManager.lambda$null$0(ConnectionPluginManager.java:270)
	at software.amazon.jdbc.ConnectionPluginManager.executeWithTelemetry(ConnectionPluginManager.java:246)
	at software.amazon.jdbc.ConnectionPluginManager.lambda$makePluginChainFunc$1(ConnectionPluginManager.java:270)
	at software.amazon.jdbc.ConnectionPluginManager.executeWithSubscribedPlugins(ConnectionPluginManager.java:236)
	at software.amazon.jdbc.ConnectionPluginManager.execute(ConnectionPluginManager.java:345)
	at software.amazon.jdbc.util.WrapperUtils.executeWithPlugins(WrapperUtils.java:244)
	at software.amazon.jdbc.wrapper.ResultSetWrapper.getBigDecimal(ResultSetWrapper.java:229)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:416)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:414)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:340)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.UnsafeRowInterceptor.hasNext(UnsafeRowInterceptor.java:24)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.start(UnsafeShuffleWriter.java:229)
	at org.apache.spark.shuffle.DirectShuffleWriteProcessor.doWrite(DirectShuffleWriteProcessor.scala:44)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:69)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
	at org.apache.spark.scheduler.Task.run(Task.scala:152)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.mysql.cj.exceptions.DataConversionException: Cannot determine value type from string 'rno'
	at com.mysql.cj.result.AbstractNumericValueFactory.createFromBytes(AbstractNumericValueFactory.java:56)
	at com.mysql.cj.protocol.a.MysqlTextValueDecoder.decodeByteArray(MysqlTextValueDecoder.java:151)
	at com.mysql.cj.protocol.result.AbstractResultsetRow.decodeAndCreateReturnValue(AbstractResultsetRow.java:127)
	at com.mysql.cj.protocol.result.AbstractResultsetRow.getValueFromBytes(AbstractResultsetRow.java:235)
	at com.mysql.cj.protocol.a.result.ByteArrayRow.getValue(ByteArrayRow.java:82)
	... 39 more

Reproduction Steps

Perform a load and show using SparkSession when running on EMR (serverless).

var query = "(select  id, row_number() over(order by (select null)) as rno from table_name) dataset";

var read =
        sparkSession
            .read()
            .format("jdbc")
            .option("url", jdbcUrl) // in format "jdbc:aws-wrapper:mysql://{HOST}:{PORT}/{DATABASE}"
            .option("user", username)
            .option("dbtable", query)
            .option("driver","software.amazon.jdbc.Driver")
            .option("lowerBound", 1)
            .option("fetchsize", 2000000)
            .option("upperBound", 100)
            .option("numPartitions" 1)
            .option("partitionColumn", "rno")
            .option("wrapperPlugins", "iam") // no password, using iam auth
            .option("wrapperDialect", DialectCodes.AURORA_MYSQL)
            .option("wrapperTargetDriverDialect", TargetDriverDialectCodes.MYSQL_CONNECTOR_J);

var result = read.load();

result.show(); // throws DataConversionException

Possible Solution

No response

Additional Information/Context

emr-7.6.0

The AWS Advanced JDBC Driver version used

2.5.6

JDK version used

17

Operating System and version

n/a

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions