Skip to content

[SPARK-8743] [Streaming]: Deregister Codahale metrics for streaming when StreamingContext is closed #7250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
92fa04b
SPARK-8743: Added the registerSource method call to the start method …
Jul 7, 2015
a665965
Added // instead of /** for commenting in code
Jul 7, 2015
7621adf
Added indentation and Space at the comment on line 578; Registering..
Jul 7, 2015
18bcc7e
Added test case for de-register metrics and made a change to the scop…
Jul 7, 2015
e4f00d7
Added additional variable to check the updated Sources size value to …
Jul 8, 2015
f5e47e0
Added the removeSource method in try
Jul 9, 2015
d04fd2a
Removed the assert for the env field, added the registerSource line i…
Jul 9, 2015
e2c3bf8
Added test to check registering and de-registering of streamingSource
Jul 9, 2015
742398c
Removed unused imports
Jul 9, 2015
ca081fa
Moved the registerSource() call before line 601
Jul 10, 2015
33a2091
Changed scope of sources and corrected comments for helper
Jul 10, 2015
a67918c
Removed extra line in Helper Methods section
Jul 10, 2015
74598ce
Added helper method for private methods and changed the test logic to…
Jul 11, 2015
e37a2f3
Changed import statements to remove unnecessary imports and add speci…
Jul 12, 2015
f54afcf
Removed types for fields in test for registering and deregistering me…
Jul 12, 2015
ea0dc1a
Changed imports statements, negated test statement and removed postfix
Jul 12, 2015
a0f1950
Removed added comment to Assert for INITIALIZED state
Jul 12, 2015
2a81287
Removing the INITIALIZED check since after start() the state moves to…
Jul 12, 2015
5d3af31
Move the INITIALIZED state check to when the ssc is initialized
Jul 12, 2015
299a57d
SPARK-8743: Added the registerSource method call to the start method …
Jul 7, 2015
b86e80b
Merge branch 'SPARK-8743' of https://github.com/nssalian/spark into S…
Jul 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,9 @@ class StreamingContext private[streaming] (
None
}

/** Register streaming source to metrics system */
/** Initializing a streaming source to help register metrics system */
private val streamingSource = new StreamingSource(this)
assert(env != null)
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)


private var state: StreamingContextState = INITIALIZED

Expand Down Expand Up @@ -598,6 +596,9 @@ class StreamingContext private[streaming] (
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
Expand Down Expand Up @@ -674,6 +675,8 @@ class StreamingContext private[streaming] (
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// De-registering Streaming Metrics of the StreamingContext
env.metricsSystem.removeSource(streamingSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here remove the source in ACTIVE state may introduce some corner case problem. For example, since metrics source is added when StreamingContext's state is INITIALIZED, if we met exception at this point, we will never change the state into ACTIVE, so metrics source cannot be removed, since you only assume the state is ACTIVE to remove the source. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to register at the call of the start().
So, based on your comment, that would mean registering the sources after the state is set to INITIALIZEDand before.

def start(): Unit = synchronized { // Registering Streaming Metrics at the start of the StreamingContext assert(env != null) assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource)

Makes sense to have it after INITIALIZED and before the synchronized block of ACTIVE and STOPPED.
@tdas, could add more light.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is to move this line out of ACTIVE condition, In any case you have to deregister the metrics source, not only in ACTIVE state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao In this PR the streaming metric source is added only after starting the context. So it is fine to deregister only when the state is ACTIVE. Also that maintain the characteristic that stop is idempotent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK for normal case, but what if an exception is met after metrics is successfully registered, but before changing the state into ACTIVE, according to the code, it will try the exception and change state into STOPPED, so at that situation, if we call stop(), we will never de-register the metrics source according to the current implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. I suggested above that the registering be moved after the ACTIVE status change.

uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
Expand All @@ -688,6 +691,7 @@ class StreamingContext private[streaming] (
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ package org.apache.spark.streaming

import java.io.{File, NotSerializableException}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.commons.io.FileUtils
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be with other spark imports, and the spark imports shouldn't have a blank line in the middle.

import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
import org.scalatest.{Assertions, BeforeAndAfter}

import org.scalatest.{PrivateMethodTester, Assertions, BeforeAndAfter}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import scala.collection.mutable.ArrayBuffer


class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
Expand Down Expand Up @@ -297,6 +298,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
Thread.sleep(100)
}

test("registering and de-registering of streamingSource") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
ssc = new StreamingContext(conf, batchDuration)
assert(ssc.getState() === StreamingContextState.INITIALIZED)
addInputStream(ssc).register()
ssc.start()

val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
assert(sources.contains(streamingSource))
assert(ssc.getState() === StreamingContextState.ACTIVE)
Thread.sleep(100)

ssc.stop()
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
}

test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down Expand Up @@ -796,3 +817,21 @@ package object testPackage extends Assertions {
}
}
}

/**
* Helper methods for testing StreamingContextSuite.
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem.
*/
private object StreamingContextSuite extends PrivateMethodTester {
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
metricsSystem.invokePrivate(_sources())
}
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
streamingContext.invokePrivate(_streamingSource())
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra empty line