@@ -842,71 +842,87 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
842
842
assert(Arrays .equals(notMappedAsArray, bytes))
843
843
}
844
844
845
- // test("updated block statuses") {
846
- // store = makeBlockManager(12000)
847
- // val list = List.fill(2)(new Array[Byte](2000))
848
- // val bigList = List.fill(8)(new Array[Byte](2000))
849
- //
850
- // // 1 updated block (i.e. list1)
851
- // val updatedBlocks1 =
852
- // store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
853
- // assert(updatedBlocks1.size === 1)
854
- // assert(updatedBlocks1.head._1 === TestBlockId("list1"))
855
- // assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
856
- //
857
- // // 1 updated block (i.e. list2)
858
- // val updatedBlocks2 =
859
- // store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
860
- // assert(updatedBlocks2.size === 1)
861
- // assert(updatedBlocks2.head._1 === TestBlockId("list2"))
862
- // assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
863
- //
864
- // // 2 updated blocks - list1 is kicked out of memory while list3 is added
865
- // val updatedBlocks3 =
866
- // store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
867
- // assert(updatedBlocks3.size === 2)
868
- // updatedBlocks3.foreach { case (id, status) =>
869
- // id match {
870
- // case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE)
871
- // case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
872
- // case _ => fail("Updated block is neither list1 nor list3")
873
- // }
874
- // }
875
- // assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
876
- //
877
- // // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
878
- // val updatedBlocks4 =
879
- // store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
880
- // assert(updatedBlocks4.size === 2)
881
- // updatedBlocks4.foreach { case (id, status) =>
882
- // id match {
883
- // case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY)
884
- // case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
885
- // case _ => fail("Updated block is neither list2 nor list4")
886
- // }
887
- // }
888
- // assert(store.diskStore.contains("list2"), "list2 was not in disk store")
889
- // assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
890
- //
891
- // // No updated blocks - list5 is too big to fit in store and nothing is kicked out
892
- // val updatedBlocks5 =
893
- // store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
894
- // assert(updatedBlocks5.size === 0)
895
- //
896
- // // memory store contains only list3 and list4
897
- // assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
898
- // assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
899
- // assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
900
- // assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
901
- // assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
902
- //
903
- // // disk store contains only list2
904
- // assert(!store.diskStore.contains("list1"), "list1 was in disk store")
905
- // assert(store.diskStore.contains("list2"), "list2 was not in disk store")
906
- // assert(!store.diskStore.contains("list3"), "list3 was in disk store")
907
- // assert(!store.diskStore.contains("list4"), "list4 was in disk store")
908
- // assert(!store.diskStore.contains("list5"), "list5 was in disk store")
909
- // }
845
+ test(" updated block statuses" ) {
846
+ store = makeBlockManager(12000 )
847
+ val list = List .fill(2 )(new Array [Byte ](2000 ))
848
+ val bigList = List .fill(8 )(new Array [Byte ](2000 ))
849
+
850
+ def getUpdatedBlocks (task : => Unit ): Seq [(BlockId , BlockStatus )] = {
851
+ val context = TaskContext .empty()
852
+ try {
853
+ TaskContext .setTaskContext(context)
854
+ task
855
+ } finally {
856
+ TaskContext .unset()
857
+ }
858
+ context.taskMetrics.updatedBlocks.getOrElse(Seq .empty)
859
+ }
860
+
861
+ // 1 updated block (i.e. list1)
862
+ val updatedBlocks1 = getUpdatedBlocks {
863
+ store.putIterator(" list1" , list.iterator, StorageLevel .MEMORY_ONLY , tellMaster = true )
864
+ }
865
+ assert(updatedBlocks1.size === 1 )
866
+ assert(updatedBlocks1.head._1 === TestBlockId (" list1" ))
867
+ assert(updatedBlocks1.head._2.storageLevel === StorageLevel .MEMORY_ONLY )
868
+
869
+ // 1 updated block (i.e. list2)
870
+ val updatedBlocks2 = getUpdatedBlocks {
871
+ store.putIterator(" list2" , list.iterator, StorageLevel .MEMORY_AND_DISK , tellMaster = true )
872
+ }
873
+ assert(updatedBlocks2.size === 1 )
874
+ assert(updatedBlocks2.head._1 === TestBlockId (" list2" ))
875
+ assert(updatedBlocks2.head._2.storageLevel === StorageLevel .MEMORY_ONLY )
876
+
877
+ // 2 updated blocks - list1 is kicked out of memory while list3 is added
878
+ val updatedBlocks3 = getUpdatedBlocks {
879
+ store.putIterator(" list3" , list.iterator, StorageLevel .MEMORY_ONLY , tellMaster = true )
880
+ }
881
+ assert(updatedBlocks3.size === 2 )
882
+ updatedBlocks3.foreach { case (id, status) =>
883
+ id match {
884
+ case TestBlockId (" list1" ) => assert(status.storageLevel === StorageLevel .NONE )
885
+ case TestBlockId (" list3" ) => assert(status.storageLevel === StorageLevel .MEMORY_ONLY )
886
+ case _ => fail(" Updated block is neither list1 nor list3" )
887
+ }
888
+ }
889
+ assert(store.memoryStore.contains(" list3" ), " list3 was not in memory store" )
890
+
891
+ // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
892
+ val updatedBlocks4 = getUpdatedBlocks {
893
+ store.putIterator(" list4" , list.iterator, StorageLevel .MEMORY_ONLY , tellMaster = true )
894
+ }
895
+ assert(updatedBlocks4.size === 2 )
896
+ updatedBlocks4.foreach { case (id, status) =>
897
+ id match {
898
+ case TestBlockId (" list2" ) => assert(status.storageLevel === StorageLevel .DISK_ONLY )
899
+ case TestBlockId (" list4" ) => assert(status.storageLevel === StorageLevel .MEMORY_ONLY )
900
+ case _ => fail(" Updated block is neither list2 nor list4" )
901
+ }
902
+ }
903
+ assert(store.diskStore.contains(" list2" ), " list2 was not in disk store" )
904
+ assert(store.memoryStore.contains(" list4" ), " list4 was not in memory store" )
905
+
906
+ // No updated blocks - list5 is too big to fit in store and nothing is kicked out
907
+ val updatedBlocks5 = getUpdatedBlocks {
908
+ store.putIterator(" list5" , bigList.iterator, StorageLevel .MEMORY_ONLY , tellMaster = true )
909
+ }
910
+ assert(updatedBlocks5.size === 0 )
911
+
912
+ // memory store contains only list3 and list4
913
+ assert(! store.memoryStore.contains(" list1" ), " list1 was in memory store" )
914
+ assert(! store.memoryStore.contains(" list2" ), " list2 was in memory store" )
915
+ assert(store.memoryStore.contains(" list3" ), " list3 was not in memory store" )
916
+ assert(store.memoryStore.contains(" list4" ), " list4 was not in memory store" )
917
+ assert(! store.memoryStore.contains(" list5" ), " list5 was in memory store" )
918
+
919
+ // disk store contains only list2
920
+ assert(! store.diskStore.contains(" list1" ), " list1 was in disk store" )
921
+ assert(store.diskStore.contains(" list2" ), " list2 was not in disk store" )
922
+ assert(! store.diskStore.contains(" list3" ), " list3 was in disk store" )
923
+ assert(! store.diskStore.contains(" list4" ), " list4 was in disk store" )
924
+ assert(! store.diskStore.contains(" list5" ), " list5 was in disk store" )
925
+ }
910
926
911
927
test(" query block statuses" ) {
912
928
store = makeBlockManager(12000 )
0 commit comments