Skip to content

Commit 50607ec

Browse files
Neelesh Srinivas Saliantdas
authored andcommitted
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when StreamingContext is closed
The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: #7250 Author: Neelesh Srinivas Salian <[email protected]> Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration (cherry picked from commit b7bcbe2) Signed-off-by: Tathagata Das <[email protected]>
1 parent 898e5f7 commit 50607ec

File tree

2 files changed

+45
-6
lines changed

2 files changed

+45
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
192192
None
193193
}
194194

195-
/** Register streaming source to metrics system */
195+
/* Initializing a streamingSource to register metrics */
196196
private val streamingSource = new StreamingSource(this)
197-
assert(env != null)
198-
assert(env.metricsSystem != null)
199-
env.metricsSystem.registerSource(streamingSource)
200197

201198
private var state: StreamingContextState = INITIALIZED
202199

@@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
606603
}
607604
shutdownHookRef = Utils.addShutdownHook(
608605
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
606+
// Registering Streaming Metrics at the start of the StreamingContext
607+
assert(env.metricsSystem != null)
608+
env.metricsSystem.registerSource(streamingSource)
609609
uiTab.foreach(_.attach())
610610
logInfo("StreamingContext started")
611611
case ACTIVE =>
@@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
682682
logWarning("StreamingContext has already been stopped")
683683
case ACTIVE =>
684684
scheduler.stop(stopGracefully)
685+
// Removing the streamingSource to de-register the metrics on stop()
686+
env.metricsSystem.removeSource(streamingSource)
685687
uiTab.foreach(_.detach())
686688
StreamingContext.setActiveContext(null)
687689
waiter.notifyStop()

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,23 @@ package org.apache.spark.streaming
2020
import java.io.{File, NotSerializableException}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.mutable.ArrayBuffer
2324
import scala.collection.mutable.Queue
2425

2526
import org.apache.commons.io.FileUtils
27+
import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester}
2628
import org.scalatest.concurrent.Eventually._
2729
import org.scalatest.concurrent.Timeouts
2830
import org.scalatest.exceptions.TestFailedDueToTimeoutException
2931
import org.scalatest.time.SpanSugar._
30-
import org.scalatest.{Assertions, BeforeAndAfter}
3132

33+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
34+
import org.apache.spark.metrics.MetricsSystem
35+
import org.apache.spark.metrics.source.Source
3236
import org.apache.spark.storage.StorageLevel
3337
import org.apache.spark.streaming.dstream.DStream
3438
import org.apache.spark.streaming.receiver.Receiver
3539
import org.apache.spark.util.Utils
36-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}
3740

3841

3942
class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
@@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
299302
Thread.sleep(100)
300303
}
301304

305+
test ("registering and de-registering of streamingSource") {
306+
val conf = new SparkConf().setMaster(master).setAppName(appName)
307+
ssc = new StreamingContext(conf, batchDuration)
308+
assert(ssc.getState() === StreamingContextState.INITIALIZED)
309+
addInputStream(ssc).register()
310+
ssc.start()
311+
312+
val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
313+
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
314+
assert(sources.contains(streamingSource))
315+
assert(ssc.getState() === StreamingContextState.ACTIVE)
316+
317+
ssc.stop()
318+
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
319+
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
320+
assert(ssc.getState() === StreamingContextState.STOPPED)
321+
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
322+
}
323+
302324
test("awaitTermination") {
303325
ssc = new StreamingContext(master, appName, batchDuration)
304326
val inputStream = addInputStream(ssc)
@@ -811,3 +833,18 @@ package object testPackage extends Assertions {
811833
}
812834
}
813835
}
836+
837+
/**
838+
* Helper methods for testing StreamingContextSuite
839+
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
840+
*/
841+
private object StreamingContextSuite extends PrivateMethodTester {
842+
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
843+
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
844+
metricsSystem.invokePrivate(_sources())
845+
}
846+
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
847+
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
848+
streamingContext.invokePrivate(_streamingSource())
849+
}
850+
}

0 commit comments

Comments
 (0)