Skip to content

Commit cd12dd9

Browse files
committed
[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver
1617: These changes expose the receiver state (active or inactive) and last error in the UI 1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself! Author: Tathagata Das <[email protected]> Closes #540 from tdas/streaming-ui-fix and squashes the following commits: e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix dbddf75 [Tathagata Das] Style fix. 66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo. d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui" 5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI.
1 parent 968c018 commit cd12dd9

File tree

15 files changed

+217
-103
lines changed

15 files changed

+217
-103
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging {
122122
}
123123
}
124124
if (unit.isEmpty) {
125-
"%d".formatLocal(Locale.US, value)
125+
"%d".formatLocal(Locale.US, value.toInt)
126126
} else {
127127
"%.1f%s".formatLocal(Locale.US, value, unit)
128128
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag](
5050
storageLevel: StorageLevel
5151
) extends Receiver[T](storageLevel) with Logging {
5252

53-
var socket: Socket = null
54-
var receivingThread: Thread = null
55-
5653
def onStart() {
57-
receivingThread = new Thread("Socket Receiver") {
58-
override def run() {
59-
connect()
60-
receive()
61-
}
62-
}
63-
receivingThread.start()
54+
// Start the thread that receives data over a connection
55+
new Thread("Socket Receiver") {
56+
setDaemon(true)
57+
override def run() { receive() }
58+
}.start()
6459
}
6560

6661
def onStop() {
67-
if (socket != null) {
68-
socket.close()
69-
}
70-
socket = null
71-
if (receivingThread != null) {
72-
receivingThread.join()
73-
}
62+
// There is nothing much to do as the thread calling receive()
63+
// is designed to stop by itself isStopped() returns false
7464
}
7565

76-
def connect() {
66+
/** Create a socket connection and receive data until receiver is stopped */
67+
def receive() {
68+
var socket: Socket = null
7769
try {
7870
logInfo("Connecting to " + host + ":" + port)
7971
socket = new Socket(host, port)
80-
} catch {
81-
case e: Exception =>
82-
restart("Could not connect to " + host + ":" + port, e)
83-
}
84-
}
85-
86-
def receive() {
87-
try {
8872
logInfo("Connected to " + host + ":" + port)
8973
val iterator = bytesToObjects(socket.getInputStream())
9074
while(!isStopped && iterator.hasNext) {
9175
store(iterator.next)
9276
}
77+
logInfo("Stopped receiving")
78+
restart("Retrying connecting to " + host + ":" + port)
9379
} catch {
94-
case e: Exception =>
95-
restart("Error receiving data from socket", e)
80+
case e: java.net.ConnectException =>
81+
restart("Error connecting to " + host + ":" + port, e)
82+
case t: Throwable =>
83+
restart("Error receiving data", t)
84+
} finally {
85+
if (socket != null) {
86+
socket.close()
87+
logInfo("Closed socket to " + host + ":" + port)
88+
}
9689
}
9790
}
9891
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart}
2828
import org.apache.spark.{Logging, SparkEnv}
2929
import org.apache.spark.storage.StorageLevel
3030
import java.nio.ByteBuffer
31+
import org.apache.spark.annotation.DeveloperApi
3132

32-
/** A helper with set of defaults for supervisor strategy */
33+
/**
34+
* :: DeveloperApi ::
35+
* A helper with set of defaults for supervisor strategy
36+
*/
37+
@DeveloperApi
3338
object ActorSupervisorStrategy {
3439

3540
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
@@ -40,6 +45,7 @@ object ActorSupervisorStrategy {
4045
}
4146

4247
/**
48+
* :: DeveloperApi ::
4349
* A receiver trait to be mixed in with your Actor to gain access to
4450
* the API for pushing received data into Spark Streaming for being processed.
4551
*
@@ -61,6 +67,7 @@ object ActorSupervisorStrategy {
6167
* to ensure the type safety, i.e parametrized type of push block and InputDStream
6268
* should be same.
6369
*/
70+
@DeveloperApi
6471
trait ActorHelper {
6572

6673
self: Actor => // to ensure that this can be added to Actor classes only
@@ -92,10 +99,12 @@ trait ActorHelper {
9299
}
93100

94101
/**
102+
* :: DeveloperApi ::
95103
* Statistics for querying the supervisor about state of workers. Used in
96104
* conjunction with `StreamingContext.actorStream` and
97105
* [[org.apache.spark.streaming.receiver.ActorHelper]].
98106
*/
107+
@DeveloperApi
99108
case class Statistics(numberOfMsgs: Int,
100109
numberOfWorkers: Int,
101110
numberOfHiccups: Int,
@@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag](
188197
supervisor ! PoisonPill
189198
}
190199
}
191-

streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer
2323
import scala.collection.JavaConversions._
2424

2525
import org.apache.spark.storage.StorageLevel
26+
import org.apache.spark.annotation.DeveloperApi
2627

2728
/**
29+
* :: DeveloperApi ::
2830
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
2931
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
3032
* should define the setup steps necessary to start receiving data,
@@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
5153
* }
5254
* }}}
5355
*/
56+
@DeveloperApi
5457
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
5558

5659
/**
@@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
198201

199202
/** Check if receiver has been marked for stopping. */
200203
def isStopped(): Boolean = {
201-
!executor.isReceiverStarted()
204+
executor.isReceiverStopped()
202205
}
203206

204207
/** Get unique identifier of this receiver. */

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818
package org.apache.spark.streaming.receiver
1919

2020
/** Messages sent to the NetworkReceiver. */
21-
private[streaming] sealed trait NetworkReceiverMessage
22-
private[streaming] object StopReceiver extends NetworkReceiverMessage
21+
private[streaming] sealed trait ReceiverMessage
22+
private[streaming] object StopReceiver extends ReceiverMessage
2323

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,22 +88,38 @@ private[streaming] abstract class ReceiverSupervisor(
8888
/** Report errors. */
8989
def reportError(message: String, throwable: Throwable)
9090

91-
/** Start the executor */
91+
/** Called when supervisor is started */
92+
protected def onStart() { }
93+
94+
/** Called when supervisor is stopped */
95+
protected def onStop(message: String, error: Option[Throwable]) { }
96+
97+
/** Called when receiver is started */
98+
protected def onReceiverStart() { }
99+
100+
/** Called when receiver is stopped */
101+
protected def onReceiverStop(message: String, error: Option[Throwable]) { }
102+
103+
/** Start the supervisor */
92104
def start() {
105+
onStart()
93106
startReceiver()
94107
}
95108

96-
/** Mark the executor and the receiver for stopping */
109+
/** Mark the supervisor and the receiver for stopping */
97110
def stop(message: String, error: Option[Throwable]) {
98111
stoppingError = error.orNull
99112
stopReceiver(message, error)
113+
onStop(message, error)
100114
stopLatch.countDown()
101115
}
102116

103117
/** Start receiver */
104118
def startReceiver(): Unit = synchronized {
105119
try {
106120
logInfo("Starting receiver")
121+
receiver.onStart()
122+
logInfo("Called receiver onStart")
107123
onReceiverStart()
108124
receiverState = Started
109125
} catch {
@@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor(
115131
/** Stop receiver */
116132
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
117133
try {
134+
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
118135
receiverState = Stopped
136+
receiver.onStop()
137+
logInfo("Called receiver onStop")
119138
onReceiverStop(message, error)
120139
} catch {
121140
case t: Throwable =>
@@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor(
130149

131150
/** Restart receiver with delay */
132151
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
133-
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
134-
error.getOrElse(null))
135-
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
136-
future {
152+
Future {
153+
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
154+
error.getOrElse(null))
155+
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
137156
logDebug("Sleeping for " + delay)
138157
Thread.sleep(delay)
139-
logDebug("Starting receiver again")
158+
logInfo("Starting receiver again")
140159
startReceiver()
141160
logInfo("Receiver started again")
142161
}
143162
}
144163

145-
/** Called when the receiver needs to be started */
146-
protected def onReceiverStart(): Unit = synchronized {
147-
// Call user-defined onStart()
148-
logInfo("Calling receiver onStart")
149-
receiver.onStart()
150-
logInfo("Called receiver onStart")
151-
}
152-
153-
/** Called when the receiver needs to be stopped */
154-
protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
155-
// Call user-defined onStop()
156-
logInfo("Calling receiver onStop")
157-
receiver.onStop()
158-
logInfo("Called receiver onStop")
159-
}
160-
161164
/** Check if receiver has been marked for stopping */
162165
def isReceiverStarted() = {
163166
logDebug("state = " + receiverState)
164167
receiverState == Started
165168
}
166169

167-
/** Wait the thread until the executor is stopped */
170+
/** Check if receiver has been marked for stopping */
171+
def isReceiverStopped() = {
172+
logDebug("state = " + receiverState)
173+
receiverState == Stopped
174+
}
175+
176+
177+
/** Wait the thread until the supervisor is stopped */
168178
def awaitTermination() {
169179
stopLatch.await()
170180
logInfo("Waiting for executor stop is over")

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl(
7979
logInfo("Received stop signal")
8080
stop("Stopped by driver", None)
8181
}
82+
83+
def ref = self
8284
}), "Receiver-" + streamId + "-" + System.currentTimeMillis())
8385

8486
/** Unique block ids if one wants to add blocks directly */
@@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl(
154156
logWarning("Reported error " + message + " - " + error)
155157
}
156158

157-
override def onReceiverStart() {
159+
override protected def onStart() {
158160
blockGenerator.start()
159-
super.onReceiverStart()
160161
}
161162

162-
override def onReceiverStop(message: String, error: Option[Throwable]) {
163-
super.onReceiverStop(message, error)
163+
override protected def onStop(message: String, error: Option[Throwable]) {
164164
blockGenerator.stop()
165+
env.actorSystem.stop(actor)
166+
}
167+
168+
override protected def onReceiverStart() {
169+
val msg = RegisterReceiver(
170+
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
171+
val future = trackerActor.ask(msg)(askTimeout)
172+
Await.result(future, askTimeout)
173+
}
174+
175+
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
165176
logInfo("Deregistering receiver " + streamId)
166177
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
167178
val future = trackerActor.ask(
@@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl(
170181
logInfo("Stopped receiver " + streamId)
171182
}
172183

173-
override def stop(message: String, error: Option[Throwable]) {
174-
super.stop(message, error)
175-
env.actorSystem.stop(actor)
176-
}
177-
178184
/** Generate new block ID */
179185
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
180186
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
package org.apache.spark.streaming.scheduler
1919

2020
import org.apache.spark.streaming.Time
21+
import org.apache.spark.annotation.DeveloperApi
2122

2223
/**
24+
* :: DeveloperApi ::
2325
* Class having information on completed batches.
2426
* @param batchTime Time of the batch
2527
* @param submissionTime Clock time of when jobs of this batch was submitted to
2628
* the streaming scheduler queue
2729
* @param processingStartTime Clock time of when the first job of this batch started processing
2830
* @param processingEndTime Clock time of when the last job of this batch finished processing
2931
*/
32+
@DeveloperApi
3033
case class BatchInfo(
3134
batchTime: Time,
3235
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import akka.actor.ActorRef
21+
import org.apache.spark.annotation.DeveloperApi
22+
23+
/**
24+
* :: DeveloperApi ::
25+
* Class having information about a receiver
26+
*/
27+
@DeveloperApi
28+
case class ReceiverInfo(
29+
streamId: Int,
30+
name: String,
31+
private[streaming] val actor: ActorRef,
32+
active: Boolean,
33+
location: String,
34+
lastErrorMessage: String = "",
35+
lastError: String = ""
36+
) {
37+
}

0 commit comments

Comments
 (0)