Skip to content

Commit adb506a

Browse files
wangyumgatorsmile
authored andcommitted
[SPARK-28852][SQL] Implement SparkGetCatalogsOperation for Thrift Server
### What changes were proposed in this pull request? This PR implements `SparkGetCatalogsOperation` for Thrift Server metadata completeness. ### Why are the changes needed? Thrift Server metadata completeness. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test Closes #25555 from wangyum/SPARK-28852. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent a3328cd commit adb506a

File tree

5 files changed

+100
-2
lines changed

5 files changed

+100
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import java.util.UUID
21+
22+
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
23+
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
24+
import org.apache.hive.service.cli.operation.GetCatalogsOperation
25+
import org.apache.hive.service.cli.session.HiveSession
26+
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.sql.SQLContext
29+
import org.apache.spark.util.{Utils => SparkUtils}
30+
31+
/**
32+
* Spark's own GetCatalogsOperation
33+
*
34+
* @param sqlContext SQLContext to use
35+
* @param parentSession a HiveSession from SessionManager
36+
*/
37+
private[hive] class SparkGetCatalogsOperation(
38+
sqlContext: SQLContext,
39+
parentSession: HiveSession)
40+
extends GetCatalogsOperation(parentSession) with Logging {
41+
42+
private var statementId: String = _
43+
44+
override def close(): Unit = {
45+
super.close()
46+
HiveThriftServer2.listener.onOperationClosed(statementId)
47+
}
48+
49+
override def runInternal(): Unit = {
50+
statementId = UUID.randomUUID().toString
51+
val logMsg = "Listing catalogs"
52+
logInfo(s"$logMsg with $statementId")
53+
setState(OperationState.RUNNING)
54+
// Always use the latest class loader provided by executionHive's state.
55+
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
56+
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
57+
58+
HiveThriftServer2.listener.onStatementStart(
59+
statementId,
60+
parentSession.getSessionHandle.getSessionId.toString,
61+
logMsg,
62+
statementId,
63+
parentSession.getUsername)
64+
65+
try {
66+
if (isAuthV2Enabled) {
67+
authorizeMetaGets(HiveOperationType.GET_CATALOGS, null)
68+
}
69+
setState(OperationState.FINISHED)
70+
} catch {
71+
case e: HiveSQLException =>
72+
setState(OperationState.ERROR)
73+
HiveThriftServer2.listener.onStatementError(
74+
statementId, e.getMessage, SparkUtils.exceptionString(e))
75+
throw e
76+
}
77+
HiveThriftServer2.listener.onStatementFinish(statementId)
78+
}
79+
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ private[thriftserver] class SparkSQLOperationManager()
6363
operation
6464
}
6565

66+
override def newGetCatalogsOperation(
67+
parentSession: HiveSession): GetCatalogsOperation = synchronized {
68+
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
69+
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
70+
" initialized or had already closed.")
71+
val operation = new SparkGetCatalogsOperation(sqlContext, parentSession)
72+
handleToOperation.put(operation.getHandle, operation)
73+
logDebug(s"Created GetCatalogsOperation with session=$parentSession.")
74+
operation
75+
}
76+
6677
override def newGetSchemasOperation(
6778
parentSession: HiveSession,
6879
catalogName: String,

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,12 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
223223
assert(!rs.next())
224224
}
225225
}
226+
227+
test("Spark's own GetCatalogsOperation(SparkGetCatalogsOperation)") {
228+
withJdbcStatement() { statement =>
229+
val metaData = statement.getConnection.getMetaData
230+
val rs = metaData.getCatalogs
231+
assert(!rs.next())
232+
}
233+
}
226234
}

sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation {
3636
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
3737
.addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.");
3838

39-
private final RowSet rowSet;
39+
protected final RowSet rowSet;
4040

4141
protected GetCatalogsOperation(HiveSession parentSession) {
4242
super(parentSession, OperationType.GET_CATALOGS);

sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation {
3636
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
3737
.addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.");
3838

39-
private final RowSet rowSet;
39+
protected final RowSet rowSet;
4040

4141
protected GetCatalogsOperation(HiveSession parentSession) {
4242
super(parentSession, OperationType.GET_CATALOGS);

0 commit comments

Comments
 (0)