Skip to content

[WIP] Onboarding StateV2 API onto New Error Class Framework #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
30 changes: 30 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3183,6 +3183,24 @@
],
"sqlState" : "0A000"
},
"STATE_STORE_MULTIPLE_COLUMN_FAMILIES" : {
"message" : [
"Creating multiple column families with <stateStoreProvider> is not supported"
],
"sqlState" : "42802"
},
"STATE_STORE_MULTIPLE_VALUES_PER_KEY" : {
"message" : [
"Store does not support multiple values per key"
],
"sqlState" : "42802"
},
"STATE_STORE_UNSUPPORTED_OPERATION" : {
"message" : [
"<operationType> operation not supported with <entity>"
],
"sqlState" : "XXKST"
},
"STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
"message" : [
"Static partition column <staticName> is also specified in the column list."
Expand Down Expand Up @@ -3336,6 +3354,18 @@
],
"sqlState" : "428EK"
},
"TWS_IMPLICIT_KEY_NOT_FOUND" : {
"message" : [
"Implicit key not found for operation on stateName=<stateName>"
],
"sqlState" : "55019"
},
"TWS_VALUE_SHOULD_BE_NONNULL" : {
"message" : [
"New value should be non-null for <typeOfState>"
],
"sqlState" : "22004"
},
"UDTF_ALIAS_NUMBER_MISMATCH" : {
"message" : [
"The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF.",
Expand Down
30 changes: 30 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,24 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists

Star (*) is not allowed in a select list when GROUP BY an ordinal position is used.

### STATE_STORE_MULTIPLE_COLUMN_FAMILIES

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Creating multiple column families with `<stateStoreProvider>` is not supported

### STATE_STORE_MULTIPLE_VALUES_PER_KEY

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Store does not support multiple values per key

### STATE_STORE_UNSUPPORTED_OPERATION

[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)

`<operationType>` operation not supported with `<entity>`

### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST

[SQLSTATE: 42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -2120,6 +2138,18 @@ Choose a different name, drop or replace the existing view, or add the IF NOT E

CREATE TEMPORARY VIEW or the corresponding Dataset APIs only accept single-part view names, but got: `<actualName>`.

### TWS_IMPLICIT_KEY_NOT_FOUND

SQLSTATE: 55019

Implicit key not found for operation on stateName=`<stateName>`

### TWS_VALUE_SHOULD_BE_NONNULL

[SQLSTATE: 22004](sql-error-conditions-sqlstates.html#class-22-data-exception)

New value should be non-null for `<typeOfState>`

### UDTF_ALIAS_NUMBER_MISMATCH

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors}
import org.apache.spark.sql.streaming.ValueState
import org.apache.spark.sql.types._

Expand All @@ -43,8 +43,7 @@ class ValueStateImpl[S](
private def encodeKey(): UnsafeRow = {
val keyOption = ImplicitKeyTracker.getImplicitKeyOption
if (!keyOption.isDefined) {
throw new UnsupportedOperationException("Implicit key not found for operation on" +
s"stateName=$stateName")
throw StateStoreErrors.implicitKeyNotFound(stateName = stateName)
}

val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId

override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Creating multiple column families with " +
"HDFSBackedStateStoreProvider is not supported")
throw StateStoreErrors.multipleColumnFamilies("HDFSStateStoreProvider")
}

override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class StateStoreChangelogWriterV1(
}

override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v1")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Put", entity = "changelog writer v1")
}

override def delete(key: Array[Byte]): Unit = {
Expand All @@ -151,8 +151,8 @@ class StateStoreChangelogWriterV1(
}

override def delete(key: Array[Byte], colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v1")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Delete", entity = "changelog writer v1")
}

override def commit(): Unit = {
Expand Down Expand Up @@ -189,8 +189,8 @@ class StateStoreChangelogWriterV2(
extends StateStoreChangelogWriter(fm, file, compressionCodec) {

override def put(key: Array[Byte], value: Array[Byte]): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v2")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Put", entity = "changelog writer v2")
}

override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
Expand All @@ -206,8 +206,8 @@ class StateStoreChangelogWriterV2(
}

override def delete(key: Array[Byte]): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v2")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Delete", entity = "changelog writer v1")
}

override def delete(key: Array[Byte], colFamilyName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.state

import org.apache.spark.{SparkRuntimeException, SparkUnsupportedOperationException}

/**
* Object for grouping error messages from (most) exceptions thrown from State API V2
*
* ERROR_CLASS has a prefix of "STV2_" representing State API V2.
*/
object StateStoreErrors {
def implicitKeyNotFound(
stateName: String): TransformWithStateImplicitKeyNotFound = {
new TransformWithStateImplicitKeyNotFound(stateName)
}

def multipleColumnFamilies(stateStoreProvider: String):
TransformWithStateMultipleColumnFamilies = {
new TransformWithStateMultipleColumnFamilies(stateStoreProvider)
}

def multipleValuesPerKey(): TransformWithStateMultipleValuesPerKey = {
new TransformWithStateMultipleValuesPerKey()
}

def unsupportedOperationException(operationName: String, entity: String):
TransformWithStateUnsupportedOperation = {
new TransformWithStateUnsupportedOperation(operationName, entity)
}
def valueShouldBeNonNull(typeOfState: String): TransformWithStateValueShouldBeNonNull = {
new TransformWithStateValueShouldBeNonNull(typeOfState)
}
}
class TransformWithStateImplicitKeyNotFound(stateName: String)
extends SparkUnsupportedOperationException(
errorClass = "TWS_IMPLICIT_KEY_NOT_FOUND",
messageParameters = Map("stateName" -> stateName)
)

class TransformWithStateMultipleColumnFamilies(stateStoreProvider: String)
extends SparkUnsupportedOperationException(
errorClass = "STAT_STORE_MULTIPLE_COLUMN_FAMILIES",
messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
)

// Used for ListState
class TransformWithStateMultipleValuesPerKey()
extends SparkRuntimeException(
errorClass = "STATE_STORE_STORE_MULTIPLE_VALUES_PER_KEY",
messageParameters = Map.empty
)

class TransformWithStateUnsupportedOperation(operationType: String, entity: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
messageParameters = Map("operationType" -> operationType, "entity" -> entity)
)

// Used for ListState
class TransformWithStateValueShouldBeNonNull(typeOfState: String)
extends SparkRuntimeException(
errorClass = "TWS_VALUE_SHOULD_BE_NONNULL",
Map("typeOfState" -> typeOfState),
cause = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MemoryStateStore extends StateStore() {
}

override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Creating multiple column families is not supported")
throw StateStoreErrors.multipleColumnFamilies("MemoryStateStoreProvider")
}

override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = map.get(key)
Expand Down