Skip to content

Commit 495a132

Browse files
lianchengmarmbrus
authored andcommitted
[SQL] Fixes race condition in CliSuite
`CliSuite` has been flaky for a while, this PR tries to improve this situation by fixing a race condition in `CliSuite`. The `captureOutput` function is used to capture both stdout and stderr output of the forked external process in two background threads and search for expected strings, but wasn't been properly synchronized before. Author: Cheng Lian <[email protected]> Closes #3060 from liancheng/fix-cli-suite and squashes the following commits: a70569c [Cheng Lian] Fixes race condition in CliSuite
1 parent e4b8089 commit 495a132

File tree

1 file changed

+15
-20
lines changed
  • sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver

1 file changed

+15
-20
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,17 @@
1818

1919
package org.apache.spark.sql.hive.thriftserver
2020

21+
import java.io._
22+
2123
import scala.collection.mutable.ArrayBuffer
22-
import scala.concurrent.ExecutionContext.Implicits.global
2324
import scala.concurrent.duration._
24-
import scala.concurrent.{Await, Future, Promise}
25+
import scala.concurrent.{Await, Promise}
2526
import scala.sys.process.{Process, ProcessLogger}
2627

27-
import java.io._
28-
import java.util.concurrent.atomic.AtomicInteger
29-
3028
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
3129
import org.scalatest.{BeforeAndAfterAll, FunSuite}
3230

33-
import org.apache.spark.{SparkException, Logging}
31+
import org.apache.spark.Logging
3432
import org.apache.spark.sql.catalyst.util.getTempFilePath
3533

3634
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
5351
""".stripMargin.split("\\s+").toSeq ++ extraArgs
5452
}
5553

56-
// AtomicInteger is needed because stderr and stdout of the forked process are handled in
57-
// different threads.
58-
val next = new AtomicInteger(0)
54+
var next = 0
5955
val foundAllExpectedAnswers = Promise.apply[Unit]()
6056
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
6157
val buffer = new ArrayBuffer[String]()
58+
val lock = new Object
6259

63-
def captureOutput(source: String)(line: String) {
60+
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
6461
buffer += s"$source> $line"
65-
// If we haven't found all expected answers...
66-
if (next.get() < expectedAnswers.size) {
67-
// If another expected answer is found...
68-
if (line.startsWith(expectedAnswers(next.get()))) {
69-
// If all expected answers have been found...
70-
if (next.incrementAndGet() == expectedAnswers.size) {
71-
foundAllExpectedAnswers.trySuccess(())
72-
}
62+
// If we haven't found all expected answers and another expected answer comes up...
63+
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
64+
next += 1
65+
// If all expected answers have been found...
66+
if (next == expectedAnswers.size) {
67+
foundAllExpectedAnswers.trySuccess(())
7368
}
7469
}
7570
}
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
8883
|=======================
8984
|Spark SQL CLI command line: ${command.mkString(" ")}
9085
|
91-
|Executed query ${next.get()} "${queries(next.get())}",
92-
|But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
86+
|Executed query $next "${queries(next)}",
87+
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
9388
|
9489
|${buffer.mkString("\n")}
9590
|===========================

0 commit comments

Comments
 (0)