-
Notifications
You must be signed in to change notification settings - Fork 28.7k
Update RecoverableNetworkWordCount.scala #2735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Trying this example, I missed the moment when the checkpoint was iniciated
Can one of the admins verify this patch? |
1 similar comment
Can one of the admins verify this patch? |
Jenkins, this is ok to test. |
QA tests have started for PR 2735 at commit
|
QA tests have finished for PR 2735 at commit
|
Test PASSed. |
@@ -116,6 +116,7 @@ object RecoverableNetworkWordCount { | |||
() => { | |||
createContext(ip, port, outputPath) | |||
}) | |||
ssc.checkpoint(checkpointDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The right place to add this is in the createContext function, as all the setting up (including the setting up of checkpoint directory) should be in that function. Could you update this PR based on that?
Thank you for catching this. The example is almost useless without that! I had one comment, please address that. Other than I tested the change manually (since unit tests dont cover this) and it is good. |
Also, please make a JIRA and update the title with the JIRA (see other pull requests) |
I made this change in #2564 too. You could kinda think of this as part of the SPARK-2548 JIRA as well as it includes some touch-ups to the network word count examples. |
Ok, I've added ssc.checkpoint(checkpointDirectory) to createContext. First, I wasn't sure that the checkpoin is initiated when the context is recreated from checkpoinDirector. That's why I put it outside createContext.
Test build #23097 has started for PR 2735 at commit
|
There is one more issue in this example. It could be the lack of my understanding, or maybe a problem. When the checkpoint exists and you send some lines to nc while the streaming is off, after restarting streaming those line are recovered. But when you kill/quit streaming, everything within that particular cycle is lost and cannot be recovered. Is it as it should be? |
Test build #23097 has finished for PR 2735 at commit
|
Test PASSed. |
@@ -114,7 +115,7 @@ object RecoverableNetworkWordCount { | |||
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args | |||
val ssc = StreamingContext.getOrCreate(checkpointDirectory, | |||
() => { | |||
createContext(ip, port, outputPath) | |||
createContext(ip, port, outputPath, checkpointDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas Can I double-check that it's correct to call StreamingContext.checkpoint
only within the "create context" function? as opposed to always calling it on the result of StreamingContext.getOrCreate
? That is, if it reads checkpoint data, it already configures itself to continue using that checkpoint directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here is the piece of code that does it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess things like this should be covered in some unit test. Will add later.
@comcmipi Yes, in the current version, if you quit the streaming application (aka driver) quits, then the data received by receivers but not yet processed gets lost. Thats why we have add an experimental feature (already added to master, to be released with Spark 1.2) where the received data will be stored to a log file in HDFS, so that no data is lost. |
Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe. Author: Sean Owen <[email protected]> Closes #2564 from srowen/SPARK-2548 and squashes the following commits: 0d0bf29 [Sean Owen] Update checkpoint call as in #2735 35f23e3 [Sean Owen] Remove old comment about running in standalone mode 179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples (cherry picked from commit 3a02d41) Signed-off-by: Tathagata Das <[email protected]>
Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe. Author: Sean Owen <[email protected]> Closes #2564 from srowen/SPARK-2548 and squashes the following commits: 0d0bf29 [Sean Owen] Update checkpoint call as in #2735 35f23e3 [Sean Owen] Remove old comment about running in standalone mode 179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples (cherry picked from commit 3a02d41) Signed-off-by: Tathagata Das <[email protected]>
Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe. Author: Sean Owen <[email protected]> Closes #2564 from srowen/SPARK-2548 and squashes the following commits: 0d0bf29 [Sean Owen] Update checkpoint call as in #2735 35f23e3 [Sean Owen] Remove old comment about running in standalone mode 179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples
@tdas Thanks for clarifying it. |
Tested this manually, works perfectly. Merging this. Thanks for this! |
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes #2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes #2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala (cherry picked from commit 0340c56) Signed-off-by: Tathagata Das <[email protected]>
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes apache#2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes apache#2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes apache#2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe. Author: Sean Owen <[email protected]> Closes #2564 from srowen/SPARK-2548 and squashes the following commits: 0d0bf29 [Sean Owen] Update checkpoint call as in apache/spark#2735 35f23e3 [Sean Owen] Remove old comment about running in standalone mode 179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples
Trying this example, I missed the moment when the checkpoint was iniciated