Skip to content

Commit ce73c3a

Browse files
Merge pull request ReactiveX#420 from samuelgruetter/scalaadaptor
Scala Adaptor
2 parents b8ae284 + a33f9c2 commit ce73c3a

File tree

23 files changed

+1494
-802
lines changed

23 files changed

+1494
-802
lines changed

language-adaptors/rxjava-scala-java/README.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

language-adaptors/rxjava-scala-java/build.gradle

Lines changed: 0 additions & 32 deletions
This file was deleted.

language-adaptors/rxjava-scala/README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blo
6262

6363
Scala code using Rx should only import members from `rx.lang.scala` and below.
6464

65-
Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala).
65+
66+
## Documentation
67+
68+
The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable).
69+
70+
You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory.
71+
72+
Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it.
6673

6774

6875
## Binaries

language-adaptors/rxjava-scala/TODO.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ TODOs for Scala Adapter
44

55
This is a (probably incomplete) list of what still needs to be done in the Scala adaptor:
66

7-
* mirror complete Java package structure in Scala
8-
* objects for classes with static methods or singletons (e.g. Schedulers, Subscriptions)
9-
* Notification as a case class
10-
* integrating Scala Futures, should there be a common base interface for Futures and Observables?
11-
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, exists, tails, ...
7+
* Integrating Scala Futures: Should there be a common base interface for Futures and Observables? And if all subscribers of an Observable wrapping a Future unsubscribe, the Future should be cancelled, but Futures do not support cancellation.
8+
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, tails, ...
129
* combineLatest with arities > 2
13-
* decide where the MovieLib/MovieLibUsage (use Scala code from Java code) example should live and make sure gradle builds it in the right order
10+
* Implicit schedulers?
1411
* Avoid text duplication in scaladoc using templates, add examples, distinction between use case signature and full signature
1512
* other small TODOs
1613

1714

15+
(Implicit) schedulers for interval: Options:
16+
17+
```scala
18+
def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long]
19+
def interval(duration: Duration)(scheduler: Scheduler): Observable[Long]
20+
def interval(scheduler: Scheduler)(duration: Duration): Observable[Long]
21+
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] && def interval(duration: Duration): Observable[Long]
22+
````

language-adaptors/rxjava-scala/build.gradle

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,30 @@ tasks.withType(ScalaCompile) {
1313
}
1414

1515
sourceSets {
16+
main {
17+
scala {
18+
srcDir 'src/main/scala'
19+
}
20+
}
1621
test {
1722
scala {
1823
srcDir 'src/main/scala'
24+
srcDir 'src/test/scala'
25+
srcDir 'src/examples/scala'
26+
srcDir 'src/examples/java'
27+
}
28+
java.srcDirs = []
29+
}
30+
examples {
31+
// It seems that in Gradle, the dependency "compileScala depends on compileJava" is hardcoded,
32+
// or at least not meant to be removed.
33+
// However, compileScala also runs javac at the very end, so we just add the Java sources to
34+
// the scala source set:
35+
scala {
36+
srcDir 'src/examples/scala'
37+
srcDir 'src/examples/java'
1938
}
39+
java.srcDirs = []
2040
}
2141
}
2242

@@ -34,6 +54,15 @@ tasks.compileScala {
3454
classpath = classpath + (configurations.compile + configurations.provided)
3555
}
3656

57+
tasks.compileExamplesScala {
58+
classpath = classpath + files(compileScala.destinationDir) + (configurations.compile + configurations.provided)
59+
}
60+
61+
// Add RxJava core to Scaladoc input:
62+
// tasks.scaladoc.source(project(':rxjava-core').tasks.getByPath(':rxjava-core:compileJava').source)
63+
// println("-------")
64+
// println(tasks.scaladoc.source.asPath)
65+
3766
task test(overwrite: true, dependsOn: testClasses) << {
3867
ant.taskdef(name: 'scalatest',
3968
classname: 'org.scalatest.tools.ScalaTestAntTask',

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala renamed to language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import rx.lang.scala._
2121
import scala.concurrent.duration._
2222
import org.junit.{Before, Test, Ignore}
2323
import org.junit.Assert._
24-
import rx.lang.scala.concurrency.NewThreadScheduler
24+
import rx.lang.scala.concurrency.Schedulers
25+
import java.io.IOException
2526

2627
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
2728
class RxScalaDemo extends JUnitSuite {
@@ -167,10 +168,10 @@ class RxScalaDemo extends JUnitSuite {
167168

168169
@Test def schedulersExample() {
169170
val o = Observable.interval(100 millis).take(8)
170-
o.observeOn(NewThreadScheduler).subscribe(
171+
o.observeOn(Schedulers.newThread).subscribe(
171172
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
172173
)
173-
o.observeOn(NewThreadScheduler).subscribe(
174+
o.observeOn(Schedulers.newThread).subscribe(
174175
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
175176
)
176177
waitFor(o)
@@ -287,7 +288,7 @@ class RxScalaDemo extends JUnitSuite {
287288
// We can't put a general average method into Observable.scala, because Scala's Numeric
288289
// does not have scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum)
289290
def doubleAverage(o: Observable[Double]): Observable[Double] = {
290-
for ((finalSum, finalCount) <- o.fold((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
291+
for ((finalSum, finalCount) <- o.foldLeft((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
291292
yield finalSum / finalCount
292293
}
293294

@@ -321,13 +322,13 @@ class RxScalaDemo extends JUnitSuite {
321322
.toBlockingObservable.foreach(println(_))
322323
}
323324

324-
// source Observables are in a List:
325-
@Test def zipManySeqExample() {
326-
val observables = List(Observable(1, 2), Observable(10, 20), Observable(100, 200))
327-
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
325+
// source Observables are all known:
326+
@Test def zip3Example() {
327+
val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200))
328+
(for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3")
328329
.toBlockingObservable.foreach(println(_))
329330
}
330-
331+
331332
// source Observables are in an Observable:
332333
@Test def zipManyObservableExample() {
333334
val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200))
@@ -375,6 +376,88 @@ class RxScalaDemo extends JUnitSuite {
375376
assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single)
376377
}
377378

379+
@Test def timestampExample() {
380+
val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable
381+
for ((millis, value) <- timestamped if value > 0) {
382+
println(value + " at t = " + millis)
383+
}
384+
}
385+
386+
@Test def materializeExample1() {
387+
def printObservable[T](o: Observable[T]): Unit = {
388+
import Notification._
389+
o.materialize.subscribe(n => n match {
390+
case OnNext(v) => println("Got value " + v)
391+
case OnCompleted() => println("Completed")
392+
case OnError(err) => println("Error: " + err.getMessage)
393+
})
394+
}
395+
396+
val o1 = Observable.interval(100 millis).take(3)
397+
val o2 = Observable(new IOException("Oops"))
398+
printObservable(o1)
399+
waitFor(o1)
400+
printObservable(o2)
401+
waitFor(o2)
402+
}
403+
404+
@Test def materializeExample2() {
405+
import Notification._
406+
Observable(1, 2, 3).materialize.subscribe(n => n match {
407+
case OnNext(v) => println("Got value " + v)
408+
case OnCompleted() => println("Completed")
409+
case OnError(err) => println("Error: " + err.getMessage)
410+
})
411+
}
412+
413+
@Test def elementAtReplacement() {
414+
assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single)
415+
}
416+
417+
@Test def elementAtOrDefaultReplacement() {
418+
assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single)
419+
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
420+
}
421+
422+
@Test def observableLikeFuture1() {
423+
implicit val scheduler = Schedulers.threadPoolForIO
424+
val o1 = observable {
425+
Thread.sleep(1000)
426+
5
427+
}
428+
val o2 = observable {
429+
Thread.sleep(500)
430+
4
431+
}
432+
Thread.sleep(500)
433+
val t1 = System.currentTimeMillis
434+
println((o1 merge o2).first.toBlockingObservable.single)
435+
println(System.currentTimeMillis - t1)
436+
}
437+
438+
@Test def observableLikeFuture2() {
439+
class Friend {}
440+
val session = new Object {
441+
def getFriends: List[Friend] = List(new Friend, new Friend)
442+
}
443+
444+
implicit val scheduler = Schedulers.threadPoolForIO
445+
val o: Observable[List[Friend]] = observable {
446+
session.getFriends
447+
}
448+
o.subscribe(
449+
friendList => println(friendList),
450+
err => println(err.getMessage)
451+
)
452+
453+
Thread.sleep(1500) // or convert to BlockingObservable
454+
}
455+
456+
@Test def takeWhileWithIndexAlternative {
457+
val condition = true
458+
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
459+
}
460+
378461
def output(s: String): Unit = println(s)
379462

380463
// blocks until obs has completed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,56 +19,52 @@ import java.{ lang => jlang }
1919
import rx.util.functions._
2020

2121
/**
22-
* These function conversions convert between Scala functions and Rx Funcs and Actions.
23-
* Most users RxScala won't need them, but they might be useful if one wants to use
24-
* the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants
25-
* to use a Java library taking/returning Funcs and Actions.
22+
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
23+
* Most RxScala users won't need them, but they might be useful if one wants to use
24+
* the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
25+
* to use a Java library taking/returning `Func`s and `Action`s.
2626
*/
2727
object ImplicitFunctionConversions {
2828
import language.implicitConversions
2929

30+
implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
31+
new Func2[rx.Scheduler, T, Subscription] {
32+
def call(s: rx.Scheduler, t: T): Subscription = {
33+
action(s, t)
34+
}
35+
}
36+
37+
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava
38+
39+
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
40+
3041
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
3142
new rx.Observable.OnSubscribeFunc[T] {
32-
def onSubscribe(obs: Observer[_ >: T]): Subscription = {
43+
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
3344
f(obs)
3445
}
3546
}
3647

37-
/**
38-
* Converts a by-name parameter to a Rx Func0
39-
*/
4048
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
4149
new Func0[B] {
4250
def call(): B = param
4351
}
4452

45-
/**
46-
* Converts 0-arg function to Rx Action0
47-
*/
4853
implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 =
4954
new Action0 {
5055
def call(): Unit = f()
5156
}
5257

53-
/**
54-
* Converts 1-arg function to Rx Action1
55-
*/
5658
implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] =
5759
new Action1[A] {
5860
def call(a: A): Unit = f(a)
5961
}
6062

61-
/**
62-
* Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
63-
*/
6463
implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] =
6564
new Func1[A, jlang.Boolean] {
6665
def call(a: A): jlang.Boolean = f(a).booleanValue
6766
}
6867

69-
/**
70-
* Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean]
71-
*/
7268
implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] =
7369
new Func2[A, B, jlang.Boolean] {
7470
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
@@ -79,34 +75,21 @@ object ImplicitFunctionConversions {
7975
def call(args: java.lang.Object*): R = f(args)
8076
}
8177

82-
/**
83-
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
84-
*/
8578
implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] =
8679
new Func2[A, jlang.Integer, jlang.Boolean] {
8780
def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue
8881
}
8982

90-
/**
91-
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
92-
*/
9383
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
9484
new Func2[A, A, jlang.Integer] {
9585
def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue
9686
}
9787

98-
/**
99-
* This implicit allows Scala code to use any exception type and still work
100-
* with invariant Func1 interface
101-
*/
10288
implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] =
10389
new Func1[Exception, B] {
10490
def call(ex: Exception): B = f(ex.asInstanceOf[A])
10591
}
10692

107-
/**
108-
* The following implicits convert functions of different arities into the Rx equivalents
109-
*/
11093
implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] =
11194
new Func0[A] {
11295
def call(): A = f()

0 commit comments

Comments
 (0)