Skip to content

Commit 0a21de2

Browse files
ericlyhuai
authored andcommitted
[SC-6106] Atomic commit protocol unit test hangs in HashMap.elemEquals
## What changes were proposed in this pull request? We saw the following hang during a jenkins run: ``` "Executor task launch worker for task 281" apache#24085 daemon prio=5 os_prio=0 tid=0x00007fa140696000 nid=0x78fc runnable [0x00007fa06e7f7000] java.lang.Thread.State: RUNNABLE at java.net.URI.equals(URI.java:1421) at org.apache.hadoop.fs.Path.equals(Path.java:392) at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:358) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:136) at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:165) at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.put(HashMap.scala:76) at scala.collection.mutable.HashMap.update(HashMap.scala:81) at org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocolSuite$FakeClockFileSystem.create(DatabricksAtomicCommitProtocolSuite.scala:355) ``` Since URI.equals is non-blocking and the thread is at 100% cpu, the hash map must have gotten a loop in it due to concurrent accesses. The fix should be to synchronize on accesses to that map during tests. ## How was this patch tested? Ran existing tests. Author: Eric Liang <[email protected]> Closes apache#269 from ericl/sc-6106.
1 parent a91cbd3 commit 0a21de2

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,9 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
253253

254254
test("reader re-lists directory when files may be missing from the initial list") {
255255
withTempDir { dir =>
256-
var listCount = 0
256+
@volatile var listCount = 0
257257
val fs = new RawLocalFileSystem() {
258-
override def listStatus(path: Path): Array[FileStatus] = {
258+
override def listStatus(path: Path): Array[FileStatus] = synchronized {
259259
listCount += 1
260260
super.listStatus(path)
261261
}
@@ -311,9 +311,9 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
311311

312312
test("re-list is avoided after a grace period") {
313313
withTempDir { dir =>
314-
var listCount = 0
314+
@volatile var listCount = 0
315315
val fs = new RawLocalFileSystem() {
316-
override def listStatus(path: Path): Array[FileStatus] = {
316+
override def listStatus(path: Path): Array[FileStatus] = synchronized {
317317
listCount += 1
318318
super.listStatus(path)
319319
}
@@ -351,13 +351,13 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
351351

352352
// override file creation to save our custom timestamps
353353
override def create(path: Path): FSDataOutputStream = create(path, false)
354-
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = {
354+
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = synchronized {
355355
creationTimes(path.makeQualified(this)) = clock.getTimeMillis()
356356
super.create(path, overwrite)
357357
}
358358

359359
// fill in our custom timestamps on list
360-
override def listStatus(path: Path): Array[FileStatus] = {
360+
override def listStatus(path: Path): Array[FileStatus] = synchronized {
361361
super.listStatus(path).map { stat =>
362362
new FileStatus(
363363
stat.getLen,
@@ -589,10 +589,10 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
589589
* Emulates S3 list consistency guarantees. We assume read-after-write for single keys,
590590
* however a list call is not atomic and so may observe writes out of order.
591591
*/
592-
var numLists = 0
592+
@volatile var numLists = 0
593593
val inconsistentFs = new RawLocalFileSystem() {
594594
val consistentFiles = mutable.Set[Path]()
595-
override def listStatus(path: Path): Array[FileStatus] = {
595+
override def listStatus(path: Path): Array[FileStatus] = synchronized {
596596
numLists += 1
597597
super.listStatus(path).filter { stat =>
598598
stat.getPath match {
@@ -648,7 +648,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
648648
val clock = new ManualClock(123456789L)
649649

650650
val fakeFs = new RawLocalFileSystem() {
651-
override def listStatus(path: Path): Array[FileStatus] = {
651+
override def listStatus(path: Path): Array[FileStatus] = synchronized {
652652
super.listStatus(path).map { stat =>
653653
new FileStatus(
654654
stat.getLen,

0 commit comments

Comments
 (0)