Skip to content

Update code to use Spark 3.0 & Scala 2.12 #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 22, 2021
Merged

Conversation

alexott
Copy link
Contributor

@alexott alexott commented Jan 16, 2021

Most of tests are passing, except the read with offset option - I suspect that it's caused by incorrect usage of the SparkContext inside the test, so context is stopped, leading to eviction of the blocks.

test log is attached: test.log

@areese
Copy link

areese commented Jan 20, 2021

@alexott I managed to get the tests to pass by increasing the timeout in src/test/scala/org/apache/spark/sql/redis/stream/RedisStreamSourceSuite.scala:231 from 50 milliseconds to 5000 milliseconds. ;)

@alexott
Copy link
Contributor Author

alexott commented Jan 20, 2021

@areese even 2 seconds is enough on my old laptop... I've tried to do while(!query.isActive) {Thread.sleep(50)} but it didn't help

@alexott alexott mentioned this pull request Jan 20, 2021
@areese
Copy link

areese commented Jan 20, 2021

@alexott I used 5000 cause it was simple. ;).
I think the issue with the loop is that the code closes the SparkContext before the query has been emitted.

I bet sleeping for 1s, and then doing the query is active might work.

@jeisinge
Copy link

In some of our streaming code, we also need to wait for the state of streams. I figured I would share a bit of that code here --- maybe it would be useful for you all?

def untilStreamIsReady(name:String):Unit = {
  val queries = spark.streams.active.filter(_.name == name)

  if (queries.length == 0) {
    println("The stream is not active.")
  } else {
    while (queries(0).isActive && queries(0).recentProgress.length == 0) {
      // wait until there is any type of progress
    }

    if (queries(0).isActive) {
      queries(0).awaitTermination(5*1000)
      println("The stream is active and ready.")
    } else {
      println("The stream is not active.")
    }
  }
}

@fe2s fe2s self-requested a review January 25, 2021 20:46
fe2s
fe2s previously approved these changes Jan 25, 2021
@fe2s
Copy link
Contributor

fe2s commented Jan 25, 2021

Thanks @alexott, the PR looks good!
We will merge the master into branch-2.4 and merge this PR after that.

@fe2s
Copy link
Contributor

fe2s commented Jul 22, 2021

Hi @alexott, sorry, it's been a long time since this PR was created, but we sorted out some other PR that blocked this one just now. Would you still be able to merge the conflict here?

@alexott
Copy link
Contributor Author

alexott commented Jul 22, 2021

@fe2s I've rebased this branch onto master

@fe2s fe2s merged commit 49c5284 into RedisLabs:master Jul 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants