Skip to content

Commit eddf9fd

Browse files
committed
Fix the race condition in StreamExecution.processAllAvailable again
1 parent 3a21e8d commit eddf9fd

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,12 @@ class StreamExecution(
241241
// method. See SPARK-14131.
242242
//
243243
// Check to see what new data is available.
244-
val newData = microBatchThread.runUninterruptibly {
245-
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
246-
}
247-
availableOffsets ++= newData
248-
249244
val hasNewData = awaitBatchLock.synchronized {
245+
val newData = microBatchThread.runUninterruptibly {
246+
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
247+
}
248+
availableOffsets ++= newData
249+
250250
if (dataAvailable) {
251251
true
252252
} else {

0 commit comments

Comments
 (0)