17
17
18
18
package org .apache .spark .streaming .dstream
19
19
20
- import java .util .concurrent .ArrayBlockingQueue
21
20
import java .nio .ByteBuffer
21
+ import java .util .concurrent .ArrayBlockingQueue
22
22
23
- import scala .collection .mutable .ArrayBuffer
23
+ import scala .collection .mutable .{ ArrayBuffer , HashMap }
24
24
import scala .concurrent .Await
25
25
import scala .concurrent .duration ._
26
26
import scala .reflect .ClassTag
27
27
28
- import akka .actor .{Props , Actor }
28
+ import akka .actor .{Actor , Props }
29
29
import akka .pattern .ask
30
30
31
- import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
32
- import org .apache .spark .streaming ._
33
31
import org .apache .spark .{Logging , SparkEnv }
34
- import org .apache .spark .rdd .{RDD , BlockRDD }
32
+ import org .apache .spark .rdd .{BlockRDD , RDD }
35
33
import org .apache .spark .storage .{BlockId , StorageLevel , StreamBlockId }
36
- import org .apache .spark .streaming .scheduler .{DeregisterReceiver , AddBlocks , RegisterReceiver }
34
+ import org .apache .spark .streaming ._
35
+ import org .apache .spark .streaming .scheduler .{ReceivedBlockInfo , AddBlocks , DeregisterReceiver , RegisterReceiver }
36
+ import org .apache .spark .streaming .util .{RecurringTimer , SystemClock }
37
+ import org .apache .spark .util .Utils
37
38
38
39
/**
39
40
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream ]]
@@ -48,8 +49,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
48
49
abstract class NetworkInputDStream [T : ClassTag ](@ transient ssc_ : StreamingContext )
49
50
extends InputDStream [T ](ssc_) {
50
51
51
- // This is an unique identifier that is used to match the network receiver with the
52
- // corresponding network input stream.
52
+ /** Keeps all received blocks information */
53
+ private val receivedBlockInfo = new HashMap [Time , Array [ReceivedBlockInfo ]]
54
+
55
+ /** This is an unique identifier for the network input stream. */
53
56
val id = ssc.getNewNetworkStreamId()
54
57
55
58
/**
@@ -64,23 +67,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
64
67
65
68
def stop () {}
66
69
70
+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
67
71
override def compute (validTime : Time ): Option [RDD [T ]] = {
68
72
// If this is called for any time before the start time of the context,
69
73
// then this returns an empty RDD. This may happen when recovering from a
70
74
// master failure
71
75
if (validTime >= graph.startTime) {
72
- val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
76
+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
77
+ receivedBlockInfo(validTime) = blockInfo
78
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf [BlockId ])
73
79
Some (new BlockRDD [T ](ssc.sc, blockIds))
74
80
} else {
75
81
Some (new BlockRDD [T ](ssc.sc, Array [BlockId ]()))
76
82
}
77
83
}
84
+
85
+ /** Get information on received blocks. */
86
+ private [streaming] def getReceivedBlockInfo (time : Time ) = {
87
+ receivedBlockInfo(time)
88
+ }
89
+
90
+ /**
91
+ * Clear metadata that are older than `rememberDuration` of this DStream.
92
+ * This is an internal method that should not be called directly. This
93
+ * implementation overrides the default implementation to clear received
94
+ * block information.
95
+ */
96
+ private [streaming] override def clearMetadata (time : Time ) {
97
+ super .clearMetadata(time)
98
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
99
+ receivedBlockInfo --= oldReceivedBlocks.keys
100
+ logDebug(" Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
101
+ (time - rememberDuration) + " : " + oldReceivedBlocks.keys.mkString(" , " ))
102
+ }
78
103
}
79
104
80
105
81
106
private [streaming] sealed trait NetworkReceiverMessage
82
107
private [streaming] case class StopReceiver (msg : String ) extends NetworkReceiverMessage
83
- private [streaming] case class ReportBlock (blockId : BlockId , metadata : Any )
108
+ private [streaming] case class ReportBlock (blockId : StreamBlockId , numRecords : Long , metadata : Any )
84
109
extends NetworkReceiverMessage
85
110
private [streaming] case class ReportError (msg : String ) extends NetworkReceiverMessage
86
111
@@ -155,21 +180,30 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
155
180
actor ! ReportError (e.toString)
156
181
}
157
182
158
-
159
183
/**
160
184
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
161
185
*/
162
- def pushBlock (blockId : BlockId , arrayBuffer : ArrayBuffer [T ], metadata : Any , level : StorageLevel ) {
186
+ def pushBlock (
187
+ blockId : StreamBlockId ,
188
+ arrayBuffer : ArrayBuffer [T ],
189
+ metadata : Any ,
190
+ level : StorageLevel
191
+ ) {
163
192
env.blockManager.put(blockId, arrayBuffer.asInstanceOf [ArrayBuffer [Any ]], level)
164
- actor ! ReportBlock (blockId, metadata)
193
+ actor ! ReportBlock (blockId, arrayBuffer.size, metadata)
165
194
}
166
195
167
196
/**
168
197
* Pushes a block (as bytes) into the block manager.
169
198
*/
170
- def pushBlock (blockId : BlockId , bytes : ByteBuffer , metadata : Any , level : StorageLevel ) {
199
+ def pushBlock (
200
+ blockId : StreamBlockId ,
201
+ bytes : ByteBuffer ,
202
+ metadata : Any ,
203
+ level : StorageLevel
204
+ ) {
171
205
env.blockManager.putBytes(blockId, bytes, level)
172
- actor ! ReportBlock (blockId, metadata)
206
+ actor ! ReportBlock (blockId, - 1 , metadata)
173
207
}
174
208
175
209
/** A helper actor that communicates with the NetworkInputTracker */
@@ -182,13 +216,15 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
182
216
val timeout = 5 .seconds
183
217
184
218
override def preStart () {
185
- val future = tracker.ask(RegisterReceiver (streamId, self))(timeout)
219
+ val msg = RegisterReceiver (
220
+ streamId, NetworkReceiver .this .getClass.getSimpleName, Utils .localHostName(), self)
221
+ val future = tracker.ask(msg)(timeout)
186
222
Await .result(future, timeout)
187
223
}
188
224
189
225
override def receive () = {
190
- case ReportBlock (blockId, metadata) =>
191
- tracker ! AddBlocks (streamId, Array ( blockId), metadata)
226
+ case ReportBlock (blockId, numRecords, metadata) =>
227
+ tracker ! AddBlocks (ReceivedBlockInfo ( streamId, blockId, numRecords, metadata) )
192
228
case ReportError (msg) =>
193
229
tracker ! DeregisterReceiver (streamId, msg)
194
230
case StopReceiver (msg) =>
@@ -210,7 +246,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
210
246
class BlockGenerator (storageLevel : StorageLevel )
211
247
extends Serializable with Logging {
212
248
213
- case class Block (id : BlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
249
+ case class Block (id : StreamBlockId , buffer : ArrayBuffer [T ], metadata : Any = null )
214
250
215
251
val clock = new SystemClock ()
216
252
val blockInterval = env.conf.getLong(" spark.streaming.blockInterval" , 200 )
0 commit comments