Skip to content

Commit c8413c8

Browse files
committed
KE-37170 fix broadcastManager NPE (apache#480)
1 parent a2f6a77 commit c8413c8

File tree

1 file changed

+22
-20
lines changed

1 file changed

+22
-20
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -363,26 +363,28 @@ class BlockManagerMasterEndpoint(
363363
while (iterator.hasNext) {
364364
val blockId = iterator.next
365365
val locations = blockLocations.get(blockId)
366-
locations -= blockManagerId
367-
// De-register the block if none of the block managers have it. Otherwise, if pro-active
368-
// replication is enabled, and a block is either an RDD or a test block (the latter is used
369-
// for unit testing), we send a message to a randomly chosen executor location to replicate
370-
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
371-
// etc.) as replication doesn't make much sense in that context.
372-
if (locations.size == 0) {
373-
blockLocations.remove(blockId)
374-
logWarning(s"No more replicas available for $blockId !")
375-
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
376-
// As a heuristic, assume single executor failure to find out the number of replicas that
377-
// existed before failure
378-
val maxReplicas = locations.size + 1
379-
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
380-
val blockLocations = locations.toSeq
381-
val candidateBMId = blockLocations(i)
382-
blockManagerInfo.get(candidateBMId).foreach { bm =>
383-
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
384-
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
385-
bm.storageEndpoint.ask[Boolean](replicateMsg)
366+
if(locations != null) {
367+
locations -= blockManagerId
368+
// De-register the block if none of the block managers have it. Otherwise, if pro-active
369+
// replication is enabled, and a block is either an RDD or a test block (the latter is used
370+
// for unit testing), we send a message to a randomly chosen executor location to replicate
371+
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
372+
// etc.) as replication doesn't make much sense in that context.
373+
if (locations.size == 0) {
374+
blockLocations.remove(blockId)
375+
logWarning(s"No more replicas available for $blockId !")
376+
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
377+
// As a heuristic, assume single executor failure to find out the number of replicas that
378+
// existed before failure
379+
val maxReplicas = locations.size + 1
380+
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
381+
val blockLocations = locations.toSeq
382+
val candidateBMId = blockLocations(i)
383+
blockManagerInfo.get(candidateBMId).foreach { bm =>
384+
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
385+
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
386+
bm.storageEndpoint.ask[Boolean](replicateMsg)
387+
}
386388
}
387389
}
388390
}

0 commit comments

Comments
 (0)