Skip to content

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Oct 12, 2014

@harishreedharan @pwendell
See JIRA for diagnosis of the problem
https://issues.apache.org/jira/browse/SPARK-3912

The solution was to reimplement it.

  1. Find a free port (by binding and releasing a server-scoket), and then use that port
  2. Remove thread.sleep()s, instead repeatedly try to create a sender and send data and check whether data was sent. Use eventually() to minimize waiting time.
  3. Check whether all the data was received, without caring about batches.

@SparkQA
Copy link

SparkQA commented Oct 12, 2014

QA tests have started for PR 2773 at commit 93cd7f6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 12, 2014

QA tests have finished for PR 2773 at commit 93cd7f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21650/
Test PASSed.

@tdas
Copy link
Contributor Author

tdas commented Oct 12, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Oct 12, 2014

QA tests have started for PR 2773 at commit 93cd7f6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 12, 2014

QA tests have finished for PR 2773 at commit 93cd7f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21659/
Test PASSed.

while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to retry this test multiple times? The usual case where the test fails is mainly because of bind issues, correct? Since findFreePort (sort of) takes care of that..this does not seem to help.

There is a small race condition that can be taken care of, using eventually though - where the free port is taken before the bind, in which case we can use a new free port, by calling findFreePort inside the eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I found that a lot of time the test was failing because of the uncertainty on when the Flume receiver is ready to receive the new connection. Even after the connection gets accepted, sending data does not return Status.OK (and the data that streaming receives has empty strings). I am not sure what is the reason behind this but this seems like a fairly robust way to send all the data once in a single shot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds good!

@harishreedharan
Copy link
Contributor

+1. This looks good.

@asfgit asfgit closed this in 4d26aca Oct 14, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants