17
17
18
18
package org .apache .spark .sql .hive .thriftserver
19
19
20
- import scala .collection .mutable .ArrayBuffer
21
- import scala .concurrent .ExecutionContext .Implicits .global
22
- import scala .concurrent .duration ._
23
- import scala .concurrent .{Await , Future , Promise }
24
- import scala .sys .process .{Process , ProcessLogger }
25
-
26
20
import java .io .File
27
21
import java .net .ServerSocket
28
22
import java .sql .{DriverManager , Statement }
29
23
import java .util .concurrent .TimeoutException
30
24
25
+ import scala .collection .mutable .ArrayBuffer
26
+ import scala .concurrent .duration ._
27
+ import scala .concurrent .{Await , Promise }
28
+ import scala .sys .process .{Process , ProcessLogger }
29
+ import scala .util .Try
30
+
31
31
import org .apache .hadoop .hive .conf .HiveConf .ConfVars
32
32
import org .apache .hive .jdbc .HiveDriver
33
33
import org .scalatest .FunSuite
@@ -41,25 +41,28 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
41
41
class HiveThriftServer2Suite extends FunSuite with Logging {
42
42
Class .forName(classOf [HiveDriver ].getCanonicalName)
43
43
44
- private val listeningHost = " localhost"
45
- private val listeningPort = {
46
- // Let the system to choose a random available port to avoid collision with other parallel
47
- // builds.
48
- val socket = new ServerSocket (0 )
49
- val port = socket.getLocalPort
50
- socket.close()
51
- port
52
- }
53
-
54
- private val warehousePath = getTempFilePath(" warehouse" )
55
- private val metastorePath = getTempFilePath(" metastore" )
56
- private val metastoreJdbcUri = s " jdbc:derby:;databaseName= $metastorePath;create=true "
57
-
58
- def startThriftServerWithin (timeout : FiniteDuration = 30 .seconds)(f : Statement => Unit ) {
59
- val serverScript = " ../../sbin/start-thriftserver.sh" .split(" /" ).mkString(File .separator)
44
+ val verbose = Option (System .getenv(" SPARK_SQL_TEST_VERBOSE" )).isDefined
45
+
46
+ def startThriftServerWithin (timeout : FiniteDuration = 10 .seconds)(f : Statement => Unit ) {
47
+ Thread .sleep(5000 )
48
+
49
+ val startScript = " ../../sbin/start-thriftserver.sh" .split(" /" ).mkString(File .separator)
50
+ val stopScript = " ../../sbin/stop-thriftserver.sh" .split(" /" ).mkString(File .separator)
51
+ val warehousePath = getTempFilePath(" warehouse" )
52
+ val metastorePath = getTempFilePath(" metastore" )
53
+ val metastoreJdbcUri = s " jdbc:derby:;databaseName= $metastorePath;create=true "
54
+ val listeningHost = " localhost"
55
+ val listeningPort = {
56
+ // Let the system to choose a random available port to avoid collision with other parallel
57
+ // builds.
58
+ val socket = new ServerSocket (0 )
59
+ val port = socket.getLocalPort
60
+ socket.close()
61
+ port
62
+ }
60
63
61
64
val command =
62
- s """ $serverScript
65
+ s """ $startScript
63
66
| --master local
64
67
| --hiveconf hive.root.logger=INFO,console
65
68
| --hiveconf ${ConfVars .METASTORECONNECTURLKEY }= $metastoreJdbcUri
@@ -68,29 +71,41 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
68
71
| --hiveconf ${ConfVars .HIVE_SERVER2_THRIFT_PORT }= $listeningPort
69
72
""" .stripMargin.split(" \\ s+" ).toSeq
70
73
71
- val serverStarted = Promise [Unit ]()
74
+ val serverRunning = Promise [Unit ]()
72
75
val buffer = new ArrayBuffer [String ]()
73
-
74
- def captureOutput (source : String )(line : String ) {
75
- buffer += s " $source> $line"
76
+ val LOGGING_MARK =
77
+ s " starting ${HiveThriftServer2 .getClass.getCanonicalName.stripSuffix(" $" )}, logging to "
78
+ var logTailingProcess : Process = null
79
+ var logFilePath : String = null
80
+
81
+ def captureLogOutput (line : String ): Unit = {
82
+ logInfo(s " server log | $line" )
83
+ buffer += line
76
84
if (line.contains(" ThriftBinaryCLIService listening on" )) {
77
- serverStarted .success(())
85
+ serverRunning .success(())
78
86
}
79
87
}
80
88
81
- val process = Process (command).run(
82
- ProcessLogger (captureOutput(" stdout" ), captureOutput(" stderr" )))
83
-
84
- Future {
85
- val exitValue = process.exitValue()
86
- logInfo(s " Spark SQL Thrift server process exit value: $exitValue" )
89
+ def captureThriftServerOutput (source : String )(line : String ): Unit = {
90
+ logInfo(s " server $source | $line" )
91
+ if (line.startsWith(LOGGING_MARK )) {
92
+ logFilePath = line.drop(LOGGING_MARK .length).trim
93
+ // Ensure that the log file is created so that the `tail' command won't fail
94
+ Try (new File (logFilePath).createNewFile())
95
+ logTailingProcess = Process (s " /usr/bin/env tail -f $logFilePath" )
96
+ .run(ProcessLogger (captureLogOutput, _ => ()))
97
+ }
87
98
}
88
99
100
+ Process (command).run(ProcessLogger (
101
+ captureThriftServerOutput(" stdout" ),
102
+ captureThriftServerOutput(" stderr" )))
103
+
89
104
val jdbcUri = s " jdbc:hive2:// $listeningHost: $listeningPort/ "
90
105
val user = System .getProperty(" user.name" )
91
106
92
107
try {
93
- Await .result(serverStarted .future, timeout)
108
+ Await .result(serverRunning .future, timeout)
94
109
95
110
val connection = DriverManager .getConnection(jdbcUri, user, " " )
96
111
val statement = connection.createStatement()
@@ -122,10 +137,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
122
137
|End HiveThriftServer2Suite failure output
123
138
|=========================================
124
139
""" .stripMargin, cause)
140
+ throw cause
125
141
} finally {
126
142
warehousePath.delete()
127
143
metastorePath.delete()
128
- process.destroy()
144
+ Process (stopScript).run().exitValue()
145
+ // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
146
+ Thread .sleep(3 .seconds.toMillis)
147
+ Option (logTailingProcess).map(_.destroy())
148
+ Option (logFilePath).map(new File (_).delete())
129
149
}
130
150
}
131
151
0 commit comments