Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit a35a67a

Browse files
zsxwingmarmbrus
authored andcommitted
[SPARK-14579][SQL] Fix the race condition in StreamExecution.processAllAvailable again
## What changes were proposed in this pull request? apache#12339 didn't fix the race condition. MemorySinkSuite is still flaky: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/814/testReport/junit/org.apache.spark.sql.streaming/MemorySinkSuite/registering_as_a_table/ Here is an execution order to reproduce it. | Time |Thread 1 | MicroBatchThread | |:-------------:|:-------------:|:-----:| | 1 | | `MemorySink.getOffset` | | 2 | | availableOffsets ++= newData (availableOffsets is not changed here) | | 3 | addData(newData) | | | 4 | Set `noNewData` to `false` in processAllAvailable | | | 5 | | `dataAvailable` returns `false` | | 6 | | noNewData = true | | 7 | `noNewData` is true so just return | | | 8 | assert results and fail | | | 9 | | `dataAvailable` returns true so process the new batch | This PR expands the scope of `awaitBatchLock.synchronized` to eliminate the above race. ## How was this patch tested? test("stress test"). It always failed before this patch. And it will pass after applying this patch. Ignore this test in the PR as it takes several minutes to finish. Author: Shixiong Zhu <[email protected]> Closes apache#12582 from zsxwing/SPARK-14579-2.
1 parent 9927441 commit a35a67a

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,12 @@ class StreamExecution(
242242
// method. See SPARK-14131.
243243
//
244244
// Check to see what new data is available.
245-
val newData = microBatchThread.runUninterruptibly {
246-
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
247-
}
248-
availableOffsets ++= newData
249-
250245
val hasNewData = awaitBatchLock.synchronized {
246+
val newData = microBatchThread.runUninterruptibly {
247+
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
248+
}
249+
availableOffsets ++= newData
250+
251251
if (dataAvailable) {
252252
true
253253
} else {

sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
2626
import testImplicits._
2727

2828
test("registering as a table") {
29+
testRegisterAsTable()
30+
}
31+
32+
ignore("stress test") {
33+
// Ignore the stress test as it takes several minutes to run
34+
(0 until 1000).foreach(_ => testRegisterAsTable())
35+
}
36+
37+
private def testRegisterAsTable(): Unit = {
2938
val input = MemoryStream[Int]
3039
val query = input.toDF().write
3140
.format("memory")

0 commit comments

Comments
 (0)