Skip to content

Commit 5928b39

Browse files
committed
[SPARK-3809][SQL] Fixes test suites in hive-thriftserver
As scwf pointed out, `HiveThriftServer2Suite` isn't effective anymore after the Thrift server was made a daemon. On the other hand, these test suites were known flaky, PR apache#2214 tried to fix them but failed because of unknown Jenkins build error. This PR fixes both sets of issues. In this PR, instead of watching `start-thriftserver.sh` output, the test code start a `tail` process to watch the log file. A `Thread.sleep` has to be introduced because the `kill` command used in `stop-thriftserver.sh` is not synchronous. As for the root cause of the mysterious Jenkins build failure. Please refer to [this comment](apache#2675 (comment)) below for details. ---- (Copied from PR description of apache#2214) This PR fixes two issues of `HiveThriftServer2Suite` and brings 1 enhancement: 1. Although metastore, warehouse directories and listening port are randomly chosen, all test cases share the same configuration. Due to parallel test execution, one of the two test case is doomed to fail 2. We caught any exceptions thrown from a test case and print diagnosis information, but forgot to re-throw the exception... 3. When the forked server process ends prematurely (e.g., fails to start), the `serverRunning` promise is completed with a failure, preventing the test code to keep waiting until timeout. So, embarrassingly, this test suite was failing continuously for several days but no one had ever noticed it... Fortunately no bugs in the production code were covered under the hood. Author: Cheng Lian <[email protected]> Author: wangfei <[email protected]> Closes apache#2675 from liancheng/fix-thriftserver-tests and squashes the following commits: 1c384b7 [Cheng Lian] Minor code cleanup, restore the logging level hack in TestHive.scala 7805c33 [wangfei] reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths af2b5a9 [Cheng Lian] Removes log level hacks from TestHiveContext d116405 [wangfei] make sure that log4j level is INFO ee92a82 [Cheng Lian] Relaxes timeout 7fd6757 [Cheng Lian] Fixes test suites in hive-thriftserver Conflicts: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
1 parent faeca62 commit 5928b39

File tree

2 files changed

+65
-42
lines changed

2 files changed

+65
-42
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
3030
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
3131
import org.scalatest.{BeforeAndAfterAll, FunSuite}
3232

33-
import org.apache.spark.Logging
33+
import org.apache.spark.{SparkException, Logging}
3434
import org.apache.spark.sql.catalyst.util.getTempFilePath
3535

3636
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -62,9 +62,14 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
6262

6363
def captureOutput(source: String)(line: String) {
6464
buffer += s"$source> $line"
65-
if (line.contains(expectedAnswers(next.get()))) {
66-
if (next.incrementAndGet() == expectedAnswers.size) {
67-
foundAllExpectedAnswers.trySuccess(())
65+
// If we haven't found all expected answers...
66+
if (next.get() < expectedAnswers.size) {
67+
// If another expected answer is found...
68+
if (line.startsWith(expectedAnswers(next.get()))) {
69+
// If all expected answers have been found...
70+
if (next.incrementAndGet() == expectedAnswers.size) {
71+
foundAllExpectedAnswers.trySuccess(())
72+
}
6873
}
6974
}
7075
}
@@ -75,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
7580

7681
Future {
7782
val exitValue = process.exitValue()
78-
logInfo(s"Spark SQL CLI process exit value: $exitValue")
83+
foundAllExpectedAnswers.tryFailure(
84+
new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
7985
}
8086

8187
try {
@@ -96,6 +102,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
96102
|End CliSuite failure output
97103
|===========================
98104
""".stripMargin, cause)
105+
throw cause
99106
} finally {
100107
warehousePath.delete()
101108
metastorePath.delete()
@@ -107,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
107114
val dataFilePath =
108115
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
109116

110-
runCliWithin(1.minute)(
117+
runCliWithin(3.minute)(
111118
"CREATE TABLE hive_test(key INT, val STRING);"
112119
-> "OK",
113120
"SHOW TABLES;"
@@ -118,7 +125,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
118125
-> "Time taken: ",
119126
"SELECT COUNT(*) FROM hive_test;"
120127
-> "5",
121-
"DROP TABLE hive_test"
128+
"DROP TABLE hive_test;"
122129
-> "Time taken: "
123130
)
124131
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
import scala.concurrent.ExecutionContext.Implicits.global
22-
import scala.concurrent.duration._
23-
import scala.concurrent.{Await, Future, Promise}
24-
import scala.sys.process.{Process, ProcessLogger}
25-
2620
import java.io.File
2721
import java.net.ServerSocket
2822
import java.sql.{DriverManager, Statement}
2923
import java.util.concurrent.TimeoutException
3024

25+
import scala.collection.mutable.ArrayBuffer
26+
import scala.concurrent.duration._
27+
import scala.concurrent.{Await, Promise}
28+
import scala.sys.process.{Process, ProcessLogger}
29+
import scala.util.Try
30+
3131
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
3232
import org.apache.hive.jdbc.HiveDriver
3333
import org.scalatest.FunSuite
@@ -41,25 +41,25 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
4141
class HiveThriftServer2Suite extends FunSuite with Logging {
4242
Class.forName(classOf[HiveDriver].getCanonicalName)
4343

44-
private val listeningHost = "localhost"
45-
private val listeningPort = {
46-
// Let the system to choose a random available port to avoid collision with other parallel
47-
// builds.
48-
val socket = new ServerSocket(0)
49-
val port = socket.getLocalPort
50-
socket.close()
51-
port
52-
}
53-
54-
private val warehousePath = getTempFilePath("warehouse")
55-
private val metastorePath = getTempFilePath("metastore")
56-
private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
57-
58-
def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
59-
val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
44+
def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
45+
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
46+
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
47+
48+
val warehousePath = getTempFilePath("warehouse")
49+
val metastorePath = getTempFilePath("metastore")
50+
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
51+
val listeningHost = "localhost"
52+
val listeningPort = {
53+
// Let the system to choose a random available port to avoid collision with other parallel
54+
// builds.
55+
val socket = new ServerSocket(0)
56+
val port = socket.getLocalPort
57+
socket.close()
58+
port
59+
}
6060

6161
val command =
62-
s"""$serverScript
62+
s"""$startScript
6363
| --master local
6464
| --hiveconf hive.root.logger=INFO,console
6565
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
@@ -68,29 +68,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
6868
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
6969
""".stripMargin.split("\\s+").toSeq
7070

71-
val serverStarted = Promise[Unit]()
71+
val serverRunning = Promise[Unit]()
7272
val buffer = new ArrayBuffer[String]()
73+
val LOGGING_MARK =
74+
s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
75+
var logTailingProcess: Process = null
76+
var logFilePath: String = null
7377

74-
def captureOutput(source: String)(line: String) {
75-
buffer += s"$source> $line"
78+
def captureLogOutput(line: String): Unit = {
79+
buffer += line
7680
if (line.contains("ThriftBinaryCLIService listening on")) {
77-
serverStarted.success(())
81+
serverRunning.success(())
7882
}
7983
}
8084

81-
val process = Process(command).run(
82-
ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
83-
84-
Future {
85-
val exitValue = process.exitValue()
86-
logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
85+
def captureThriftServerOutput(source: String)(line: String): Unit = {
86+
if (line.startsWith(LOGGING_MARK)) {
87+
logFilePath = line.drop(LOGGING_MARK.length).trim
88+
// Ensure that the log file is created so that the `tail' command won't fail
89+
Try(new File(logFilePath).createNewFile())
90+
logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
91+
.run(ProcessLogger(captureLogOutput, _ => ()))
92+
}
8793
}
8894

95+
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
96+
Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
97+
captureThriftServerOutput("stdout"),
98+
captureThriftServerOutput("stderr")))
99+
89100
val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
90101
val user = System.getProperty("user.name")
91102

92103
try {
93-
Await.result(serverStarted.future, timeout)
104+
Await.result(serverRunning.future, timeout)
94105

95106
val connection = DriverManager.getConnection(jdbcUri, user, "")
96107
val statement = connection.createStatement()
@@ -122,10 +133,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
122133
|End HiveThriftServer2Suite failure output
123134
|=========================================
124135
""".stripMargin, cause)
136+
throw cause
125137
} finally {
126138
warehousePath.delete()
127139
metastorePath.delete()
128-
process.destroy()
140+
Process(stopScript).run().exitValue()
141+
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
142+
Thread.sleep(3.seconds.toMillis)
143+
Option(logTailingProcess).map(_.destroy())
144+
Option(logFilePath).map(new File(_).delete())
129145
}
130146
}
131147

0 commit comments

Comments
 (0)