Skip to content

[SPARK-8743] [Streaming]: Deregister Codahale metrics for streaming when StreamingContext is closed #7362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
None
}

/** Register streaming source to metrics system */
/* Initializing a streamingSource to register metrics */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but I don't see why this was changed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It previously held the block of code that did the registration.
Here it simply initializes the streamingSource since the registration is moved into start()

private val streamingSource = new StreamingSource(this)
assert(env != null)
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)

private var state: StreamingContextState = INITIALIZED

Expand Down Expand Up @@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
Expand Down Expand Up @@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@ package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue

import org.apache.commons.io.FileUtils
import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
import org.scalatest.{Assertions, BeforeAndAfter}

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}


class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
Expand Down Expand Up @@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
Thread.sleep(100)
}

test ("registering and de-registering of streamingSource") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
ssc = new StreamingContext(conf, batchDuration)
assert(ssc.getState() === StreamingContextState.INITIALIZED)
addInputStream(ssc).register()
ssc.start()

val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
assert(sources.contains(streamingSource))
assert(ssc.getState() === StreamingContextState.ACTIVE)

ssc.stop()
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
}

test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down Expand Up @@ -811,3 +833,18 @@ package object testPackage extends Assertions {
}
}
}

/**
* Helper methods for testing StreamingContextSuite
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
*/
private object StreamingContextSuite extends PrivateMethodTester {
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
metricsSystem.invokePrivate(_sources())
}
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
streamingContext.invokePrivate(_streamingSource())
}
}