Skip to content

Commit d5d86f6

Browse files
committed
Fix incorrect lastErrorTime
1 parent 3be4b7a commit d5d86f6

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
154154
private def deregisterReceiver(streamId: Int, message: String, error: String) {
155155
val newReceiverInfo = receiverInfo.get(streamId) match {
156156
case Some(oldInfo) =>
157+
val lastErrorTime =
158+
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
157159
oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
158-
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
160+
lastError = error, lastErrorTime = lastErrorTime)
159161
case None =>
160162
logWarning("No prior receiver info")
163+
val lastErrorTime =
164+
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
161165
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
162-
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
166+
lastError = error, lastErrorTime = lastErrorTime)
163167
}
164168
receiverInfo -= streamId
165169
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))

0 commit comments

Comments
 (0)