diff --git a/language-adaptors/rxjava-scala-java/README.md b/language-adaptors/rxjava-scala-java/README.md deleted file mode 100644 index 54d7086366..0000000000 --- a/language-adaptors/rxjava-scala-java/README.md +++ /dev/null @@ -1,5 +0,0 @@ - -rxjava-scala-java ------------------ - -Contains examples illustrating how RxScala code can be used from Java. diff --git a/language-adaptors/rxjava-scala-java/build.gradle b/language-adaptors/rxjava-scala-java/build.gradle deleted file mode 100644 index d6be5aaeb7..0000000000 --- a/language-adaptors/rxjava-scala-java/build.gradle +++ /dev/null @@ -1,32 +0,0 @@ - -apply plugin: 'osgi' - - -project(':language-adaptors:rxjava-scala-java') { - //sourceSets.test.java.srcDir 'src/examples/java' - sourceSets.main.java.srcDir 'src/main/java' -} - -dependencies { - compile 'org.scala-lang:scala-library:2.10.+' - - compile project(':rxjava-core') - - compile project(':language-adaptors:rxjava-scala') - - provided 'junit:junit-dep:4.10' - provided 'org.mockito:mockito-core:1.8.5' - provided 'org.scalatest:scalatest_2.10:1.9.1' -} - -jar { - manifest { - name = 'rxjava-scala-java' - instruction 'Bundle-Vendor', 'Netflix' - instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' - instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' - instruction 'Fragment-Host', 'com.netflix.rxjava.core' - } -} - - diff --git a/language-adaptors/rxjava-scala/README.md b/language-adaptors/rxjava-scala/README.md index c4ad66d0af..05160e8ad9 100644 --- a/language-adaptors/rxjava-scala/README.md +++ b/language-adaptors/rxjava-scala/README.md @@ -62,7 +62,14 @@ For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blo Scala code using Rx should only import members from `rx.lang.scala` and below. -Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala). + +## Documentation + +The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable). + +You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory. + +Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it. ## Binaries diff --git a/language-adaptors/rxjava-scala/TODO.md b/language-adaptors/rxjava-scala/TODO.md index d4136236da..a3f4b8fd53 100644 --- a/language-adaptors/rxjava-scala/TODO.md +++ b/language-adaptors/rxjava-scala/TODO.md @@ -4,14 +4,19 @@ TODOs for Scala Adapter This is a (probably incomplete) list of what still needs to be done in the Scala adaptor: -* mirror complete Java package structure in Scala -* objects for classes with static methods or singletons (e.g. Schedulers, Subscriptions) -* Notification as a case class -* integrating Scala Futures, should there be a common base interface for Futures and Observables? -* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, exists, tails, ... +* Integrating Scala Futures: Should there be a common base interface for Futures and Observables? And if all subscribers of an Observable wrapping a Future unsubscribe, the Future should be cancelled, but Futures do not support cancellation. +* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, tails, ... * combineLatest with arities > 2 -* decide where the MovieLib/MovieLibUsage (use Scala code from Java code) example should live and make sure gradle builds it in the right order +* Implicit schedulers? * Avoid text duplication in scaladoc using templates, add examples, distinction between use case signature and full signature * other small TODOs +(Implicit) schedulers for interval: Options: + +```scala +def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long] +def interval(duration: Duration)(scheduler: Scheduler): Observable[Long] +def interval(scheduler: Scheduler)(duration: Duration): Observable[Long] +def interval(duration: Duration, scheduler: Scheduler): Observable[Long] && def interval(duration: Duration): Observable[Long] +```` diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle index 753c2749e1..db194a6d28 100644 --- a/language-adaptors/rxjava-scala/build.gradle +++ b/language-adaptors/rxjava-scala/build.gradle @@ -13,10 +13,30 @@ tasks.withType(ScalaCompile) { } sourceSets { + main { + scala { + srcDir 'src/main/scala' + } + } test { scala { srcDir 'src/main/scala' + srcDir 'src/test/scala' + srcDir 'src/examples/scala' + srcDir 'src/examples/java' + } + java.srcDirs = [] + } + examples { + // It seems that in Gradle, the dependency "compileScala depends on compileJava" is hardcoded, + // or at least not meant to be removed. + // However, compileScala also runs javac at the very end, so we just add the Java sources to + // the scala source set: + scala { + srcDir 'src/examples/scala' + srcDir 'src/examples/java' } + java.srcDirs = [] } } @@ -34,6 +54,15 @@ tasks.compileScala { classpath = classpath + (configurations.compile + configurations.provided) } +tasks.compileExamplesScala { + classpath = classpath + files(compileScala.destinationDir) + (configurations.compile + configurations.provided) +} + +// Add RxJava core to Scaladoc input: +// tasks.scaladoc.source(project(':rxjava-core').tasks.getByPath(':rxjava-core:compileJava').source) +// println("-------") +// println(tasks.scaladoc.source.asPath) + task test(overwrite: true, dependsOn: testClasses) << { ant.taskdef(name: 'scalatest', classname: 'org.scalatest.tools.ScalaTestAntTask', diff --git a/language-adaptors/rxjava-scala-java/src/main/java/rx/lang/scala/examples/MovieLibUsage.java b/language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java similarity index 100% rename from language-adaptors/rxjava-scala-java/src/main/java/rx/lang/scala/examples/MovieLibUsage.java rename to language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/MovieLib.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/MovieLib.scala similarity index 100% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/MovieLib.scala rename to language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/MovieLib.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala similarity index 100% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala rename to language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala similarity index 82% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala rename to language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index c01528fb7a..fe1747a1e6 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -21,7 +21,8 @@ import rx.lang.scala._ import scala.concurrent.duration._ import org.junit.{Before, Test, Ignore} import org.junit.Assert._ -import rx.lang.scala.concurrency.NewThreadScheduler +import rx.lang.scala.concurrency.Schedulers +import java.io.IOException @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @@ -167,10 +168,10 @@ class RxScalaDemo extends JUnitSuite { @Test def schedulersExample() { val o = Observable.interval(100 millis).take(8) - o.observeOn(NewThreadScheduler).subscribe( + o.observeOn(Schedulers.newThread).subscribe( i => println(s"${i}a (on thread #${Thread.currentThread().getId()})") ) - o.observeOn(NewThreadScheduler).subscribe( + o.observeOn(Schedulers.newThread).subscribe( i => println(s"${i}b (on thread #${Thread.currentThread().getId()})") ) waitFor(o) @@ -287,7 +288,7 @@ class RxScalaDemo extends JUnitSuite { // We can't put a general average method into Observable.scala, because Scala's Numeric // does not have scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum) def doubleAverage(o: Observable[Double]): Observable[Double] = { - for ((finalSum, finalCount) <- o.fold((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)})) + for ((finalSum, finalCount) <- o.foldLeft((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)})) yield finalSum / finalCount } @@ -321,13 +322,13 @@ class RxScalaDemo extends JUnitSuite { .toBlockingObservable.foreach(println(_)) } - // source Observables are in a List: - @Test def zipManySeqExample() { - val observables = List(Observable(1, 2), Observable(10, 20), Observable(100, 200)) - (for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")")) + // source Observables are all known: + @Test def zip3Example() { + val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200)) + (for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3") .toBlockingObservable.foreach(println(_)) } - + // source Observables are in an Observable: @Test def zipManyObservableExample() { val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200)) @@ -375,6 +376,88 @@ class RxScalaDemo extends JUnitSuite { assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single) } + @Test def timestampExample() { + val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable + for ((millis, value) <- timestamped if value > 0) { + println(value + " at t = " + millis) + } + } + + @Test def materializeExample1() { + def printObservable[T](o: Observable[T]): Unit = { + import Notification._ + o.materialize.subscribe(n => n match { + case OnNext(v) => println("Got value " + v) + case OnCompleted() => println("Completed") + case OnError(err) => println("Error: " + err.getMessage) + }) + } + + val o1 = Observable.interval(100 millis).take(3) + val o2 = Observable(new IOException("Oops")) + printObservable(o1) + waitFor(o1) + printObservable(o2) + waitFor(o2) + } + + @Test def materializeExample2() { + import Notification._ + Observable(1, 2, 3).materialize.subscribe(n => n match { + case OnNext(v) => println("Got value " + v) + case OnCompleted() => println("Completed") + case OnError(err) => println("Error: " + err.getMessage) + }) + } + + @Test def elementAtReplacement() { + assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single) + } + + @Test def elementAtOrDefaultReplacement() { + assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single) + assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single) + } + + @Test def observableLikeFuture1() { + implicit val scheduler = Schedulers.threadPoolForIO + val o1 = observable { + Thread.sleep(1000) + 5 + } + val o2 = observable { + Thread.sleep(500) + 4 + } + Thread.sleep(500) + val t1 = System.currentTimeMillis + println((o1 merge o2).first.toBlockingObservable.single) + println(System.currentTimeMillis - t1) + } + + @Test def observableLikeFuture2() { + class Friend {} + val session = new Object { + def getFriends: List[Friend] = List(new Friend, new Friend) + } + + implicit val scheduler = Schedulers.threadPoolForIO + val o: Observable[List[Friend]] = observable { + session.getFriends + } + o.subscribe( + friendList => println(friendList), + err => println(err.getMessage) + ) + + Thread.sleep(1500) // or convert to BlockingObservable + } + + @Test def takeWhileWithIndexAlternative { + val condition = true + Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1) + } + def output(s: String): Unit = println(s) // blocks until obs has completed diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index 80adb8d22a..5db1c673f6 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -19,56 +19,52 @@ import java.{ lang => jlang } import rx.util.functions._ /** - * These function conversions convert between Scala functions and Rx Funcs and Actions. - * Most users RxScala won't need them, but they might be useful if one wants to use - * the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants - * to use a Java library taking/returning Funcs and Actions. + * These function conversions convert between Scala functions and Rx `Func`s and `Action`s. + * Most RxScala users won't need them, but they might be useful if one wants to use + * the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants + * to use a Java library taking/returning `Func`s and `Action`s. */ object ImplicitFunctionConversions { import language.implicitConversions + implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) = + new Func2[rx.Scheduler, T, Subscription] { + def call(s: rx.Scheduler, t: T): Subscription = { + action(s, t) + } + } + + implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava + + implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s) + implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = new rx.Observable.OnSubscribeFunc[T] { - def onSubscribe(obs: Observer[_ >: T]): Subscription = { + def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = { f(obs) } } - /** - * Converts a by-name parameter to a Rx Func0 - */ implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] = new Func0[B] { def call(): B = param } - /** - * Converts 0-arg function to Rx Action0 - */ implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = new Action0 { def call(): Unit = f() } - /** - * Converts 1-arg function to Rx Action1 - */ implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = new Action1[A] { def call(a: A): Unit = f(a) } - /** - * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] - */ implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = new Func1[A, jlang.Boolean] { def call(a: A): jlang.Boolean = f(a).booleanValue } - /** - * Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean] - */ implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] = new Func2[A, B, jlang.Boolean] { def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue @@ -79,34 +75,21 @@ object ImplicitFunctionConversions { def call(args: java.lang.Object*): R = f(args) } - /** - * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 - */ implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = new Func2[A, jlang.Integer, jlang.Boolean] { def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue } - /** - * Converts a function shaped ilke compareTo into the equivalent Rx Func2 - */ implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = new Func2[A, A, jlang.Integer] { def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue } - /** - * This implicit allows Scala code to use any exception type and still work - * with invariant Func1 interface - */ implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = new Func1[Exception, B] { def call(ex: Exception): B = f(ex.asInstanceOf[A]) } - /** - * The following implicits convert functions of different arities into the Rx equivalents - */ implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = new Func0[A] { def call(): A = f() diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala new file mode 100644 index 0000000000..27fb82a69e --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala @@ -0,0 +1,66 @@ +package rx.lang.scala + +/** + * Emitted by Observables returned by [[Observable.materialize]]. + */ +sealed trait Notification[+T] { + def asJava: rx.Notification[_ <: T] +} + +/** + * Provides pattern matching support and constructors for Notifications. + * + * Example: + * {{{ + * import Notification._ + * Observable(1, 2, 3).materialize.subscribe(n => n match { + * case OnNext(v) => println("Got value " + v) + * case OnCompleted() => println("Completed") + * case OnError(err) => println("Error: " + err.getMessage) + * }) + * }}} + */ +object Notification { + + def apply[T](n: rx.Notification[_ <: T]): Notification[T] = n.getKind match { + case rx.Notification.Kind.OnNext => new OnNext(n) + case rx.Notification.Kind.OnCompleted => new OnCompleted(n) + case rx.Notification.Kind.OnError => new OnError(n) + } + + // OnNext, OnError, OnCompleted are not case classes because we don't want pattern matching + // to extract the rx.Notification + + class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { + def value: T = asJava.getValue + } + + object OnNext { + def unapply[U](n: Notification[U]): Option[U] = n match { + case n2: OnNext[U] => Some(n.asJava.getValue) + case _ => None + } + } + + class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { + def error: Throwable = asJava.getThrowable() + } + + object OnError { + def unapply[U](n: Notification[U]): Option[Throwable] = n match { + case n2: OnError[U] => Some(n2.asJava.getThrowable) + case _ => None + } + } + + class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {} + + object OnCompleted { + def unapply[U](n: Notification[U]): Option[Unit] = n match { + case n2: OnCompleted[U] => Some() + case _ => None + } + } + +} + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 8ff0ecd437..d845a65fa5 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,13 +14,63 @@ * limitations under the License. */ - package rx.lang.scala /** * The Observable interface that implements the Reactive Pattern. + * + * @param asJava the underlying Java observable + * + * @define subscribeObserverMain + * Call this method to subscribe an [[Observer]] for receiving + * items and notifications from the Observable. + * + * A typical implementation of `subscribe` does the following: + * + * It stores a reference to the Observer in a collection object, such as a `List[T]` object. + * + * It returns a reference to the [[Subscription]] interface. This enables Observers to + * unsubscribe, that is, to stop receiving items and notifications before the Observable stops + * sending them, which also invokes the Observer's [[Observer.onCompleted onCompleted]] method. + * + * An `Observable[T]` instance is responsible for accepting all subscriptions + * and notifying all Observers. Unless the documentation for a particular + * `Observable[T]` implementation indicates otherwise, Observers should make no + * assumptions about the order in which multiple Observers will receive their notifications. + * + * @define subscribeObserverParamObserver + * the observer + * @define subscribeObserverParamScheduler + * the [[Scheduler]] on which Observers subscribe to the Observable + * @define subscribeAllReturn + * a [[Subscription]] reference whose `unsubscribe` method can be called to stop receiving items + * before the Observable has finished sending them + * + * @define subscribeCallbacksMainWithNotifications + * Call this method to receive items and notifications from this observable. + * + * @define subscribeCallbacksMainNoNotifications + * Call this method to receive items from this observable. + * + * @define subscribeCallbacksParamOnNext + * this function will be called whenever the Observable emits an item + * @define subscribeCallbacksParamOnError + * this function will be called if an error occurs + * @define subscribeCallbacksParamOnComplete + * this function will be called when this Observable has finished emitting items + * @define subscribeCallbacksParamScheduler + * the scheduler to use + * + * @define debounceVsThrottle + * Information on debounce vs throttle: + * - [[http://drupalmotion.com/article/debounce-and-throttle-visual-explanation]] + * - [[http://unscriptable.com/2009/03/20/debouncing-javascript-methods/]] + * - [[http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/]] + * + * */ -class Observable[+T](val asJava: rx.Observable[_ <: T]) +// constructor is private because users should use apply in companion +class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) // Uncommenting this line combined with `new Observable(...)` instead of `new Observable[T](...)` // makes the compiler crash extends AnyVal @@ -30,106 +80,111 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) import scala.concurrent.duration.{Duration, TimeUnit} import rx.{Observable => JObservable} import rx.util.functions._ - import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ import rx.lang.scala.subjects.Subject import rx.lang.scala.observables.BlockingObservable import rx.lang.scala.ImplicitFunctionConversions._ + + /** + * $subscribeObserverMain + * + * @param observer $subscribeObserverParamObserver + * @param scheduler $subscribeObserverParamScheduler + * @return $subscribeAllReturn + */ + def subscribe(observer: Observer[T], scheduler: Scheduler): Subscription = { + asJava.subscribe(observer, scheduler) + } /** - * An {@link Observer} must call an Observable's {@code subscribe} method in order to - * receive items and notifications from the Observable. - * - *
A typical implementation of {@code subscribe} does the following: - *
- * It stores a reference to the Observer in a collection object, such as a {@code List
- * It returns a reference to the {@link Subscription} interface. This enables Observers to
- * unsubscribe, that is, to stop receiving items and notifications before the Observable stops
- * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method.
- *
- * An
- *
- * @param observer
- * the observer
- * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items
- * before the Observable has finished sending them
- * @throws IllegalArgumentException
- * if the {@link Observer} provided as the argument to {@code subscribe()} is {@code null}
+ * $subscribeObserverMain
+ *
+ * @param observer $subscribeObserverParamObserver
+ * @return $subscribeAllReturn
*/
def subscribe(observer: Observer[T]): Subscription = {
asJava.subscribe(observer)
}
-
+
/**
- * An {@link Observer} must call an Observable's {@code subscribe} method in order to
- * receive items and notifications from the Observable.
- *
- * A typical implementation of {@code subscribe} does the following:
- *
- * It stores a reference to the Observer in a collection object, such as a {@code List
- * It returns a reference to the {@link Subscription} interface. This enables Observers to
- * unsubscribe, that is, to stop receiving items and notifications before the Observable stops
- * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method.
- *
- * An {@code Observable
- *
- * @param observer
- * the observer
- * @param scheduler
- * the {@link Scheduler} on which Observers subscribe to the Observable
- * @return a {@link Subscription} reference with which Observers can stop receiving items and
- * notifications before the Observable has finished sending them
- * @throws IllegalArgumentException
- * if an argument to {@code subscribe()} is {@code null}
+ * $subscribeCallbacksMainNoNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @return $subscribeAllReturn
*/
- def subscribe(observer: Observer[T], scheduler: Scheduler): Subscription = {
- asJava.subscribe(observer, scheduler)
- }
-
def subscribe(onNext: T => Unit): Subscription = {
asJava.subscribe(onNext)
}
+ /**
+ * $subscribeCallbacksMainWithNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @param onError $subscribeCallbacksParamOnError
+ * @return $subscribeAllReturn
+ */
def subscribe(onNext: T => Unit, onError: Throwable => Unit): Subscription = {
asJava.subscribe(onNext, onError)
}
-
+
+ /**
+ * $subscribeCallbacksMainWithNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @param onError $subscribeCallbacksParamOnError
+ * @param onComplete $subscribeCallbacksParamOnComplete
+ * @return $subscribeAllReturn
+ */
def subscribe(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Subscription = {
asJava.subscribe(onNext, onError, onComplete)
}
-
+
+ /**
+ * $subscribeCallbacksMainWithNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @param onError $subscribeCallbacksParamOnError
+ * @param onComplete $subscribeCallbacksParamOnComplete
+ * @param scheduler $subscribeCallbacksParamScheduler
+ * @return $subscribeAllReturn
+ */
def subscribe(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit, scheduler: Scheduler): Subscription = {
asJava.subscribe(onNext, onError, onComplete, scheduler)
}
+ /**
+ * $subscribeCallbacksMainWithNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @param onError $subscribeCallbacksParamOnError
+ * @param scheduler $subscribeCallbacksParamScheduler
+ * @return $subscribeAllReturn
+ */
def subscribe(onNext: T => Unit, onError: Throwable => Unit, scheduler: Scheduler): Subscription = {
asJava.subscribe(onNext, onError, scheduler)
}
-
+
+ /**
+ * $subscribeCallbacksMainNoNotifications
+ *
+ * @param onNext $subscribeCallbacksParamOnNext
+ * @param scheduler $subscribeCallbacksParamScheduler
+ * @return $subscribeAllReturn
+ */
def subscribe(onNext: T => Unit, scheduler: Scheduler): Subscription = {
asJava.subscribe(onNext, scheduler)
}
/**
- * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to
+ * Returns a pair of a start function and an [[Observable]] that upon calling the start function causes the source Observable to
* push results into the specified subject.
*
* @param subject
- * the {@link Subject} for the {@link ConnectableObservable} to push source items
- * into
- * @param
+ * Returns an Observable that first emits the items emitted by `this`, and then the items emitted
+ * by `that`.
+ *
*
- *
+ *
*
- * A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of
- * its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
- * {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
+ *
+ * A well-behaved Observable does not interleave its invocations of the [[Observer.onNext onNext]], [[Observer.onCompleted onCompleted]], and [[Observer.onError onError]] methods of
+ * its [[Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`.
+ * `synchronize` enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
*
* @param observable
* the source Observable
- * @param
+ * Wraps each item emitted by a source Observable in a timestamped tuple.
+ *
* This Observable produces connected non-overlapping buffers. The current buffer is
- * emitted and replaced with a new buffer when the Observable produced by the specified {@link Func0} produces a {@link rx.util.Closing} object. The * {@link Func0} will then
+ * This Observable produces connected non-overlapping buffers. The current buffer is
+ * emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then
* be used to create a new Observable to listen for the end of the next buffer.
*
- * @param bufferClosingSelector
- * The {@link Func0} which is used to produce an {@link Observable} for every buffer created.
- * When this {@link Observable} produces a {@link rx.util.Closing} object, the associated buffer
+ * @param closings
+ * The function which is used to produce an [[Observable]] for every buffer created.
+ * When this [[Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* is emitted and replaced with a new one.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers, which are emitted
- * when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
+ * An [[Observable]] which produces connected non-overlapping buffers, which are emitted
+ * when the current [[Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
*/
- def buffer(bufferClosingSelector: () => Observable[Closing]) : Observable[Seq[T]] = {
- val f: Func0[_ <: rx.Observable[_ <: Closing]] = bufferClosingSelector().asJava
+ def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = {
+ val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJava
val jObs: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(f)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}
@@ -250,24 +303,24 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values.
*
- * This Observable produces buffers. Buffers are created when the specified "bufferOpenings"
- * Observable produces a {@link rx.util.Opening} object. Additionally the {@link Func0} argument
- * is used to create an Observable which produces {@link rx.util.Closing} objects. When this
+ * This Observable produces buffers. Buffers are created when the specified `openings`
+ * Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument
+ * is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this
* Observable produces such an object, the associated buffer is emitted.
*
- * @param bufferOpenings
- * The {@link Observable} which, when it produces a {@link rx.util.Opening} object, will cause
+ * @param openings
+ * The [[Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* another buffer to be created.
- * @param bufferClosingSelector
- * The {@link Func0} which is used to produce an {@link Observable} for every buffer created.
- * When this {@link Observable} produces a {@link rx.util.Closing} object, the associated buffer
+ * @param closings
+ * The function which is used to produce an [[Observable]] for every buffer created.
+ * When this [[Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* is emitted.
* @return
- * An {@link Observable} which produces buffers which are created and emitted when the specified {@link Observable}s publish certain objects.
+ * An [[Observable]] which produces buffers which are created and emitted when the specified [[Observable]]s publish certain objects.
*/
- def buffer(bufferOpenings: Observable[Opening], bufferClosingSelector: Opening => Observable[Closing]): Observable[Seq[T]] = {
- val opening: rx.Observable[_ <: Opening] = bufferOpenings.asJava
- val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => bufferClosingSelector(o).asJava
+ def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
+ val opening: rx.Observable[_ <: Opening] = openings.asJava
+ val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJava
val jObs: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(opening, closing)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}
@@ -275,15 +328,15 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values.
*
- * This Observable produces connected non-overlapping buffers, each containing "count"
+ * This Observable produces connected non-overlapping buffers, each containing `count`
* elements. When the source Observable completes or encounters an error, the current
* buffer is emitted, and the event is propagated.
*
* @param count
* The maximum size of each buffer before it should be emitted.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers containing at most
- * "count" produced values.
+ * An [[Observable]] which produces connected non-overlapping buffers containing at most
+ * `count` produced values.
*/
def buffer(count: Int): Observable[Seq[T]] = {
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(count)
@@ -293,18 +346,18 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values.
*
- * This Observable produces buffers every "skip" values, each containing "count"
+ * This Observable produces buffers every `skip` values, each containing `count`
* elements. When the source Observable completes or encounters an error, the current
* buffer is emitted, and the event is propagated.
*
* @param count
* The maximum size of each buffer before it should be emitted.
* @param skip
- * How many produced values need to be skipped before starting a new buffer. Note that when "skip" and
- * "count" are equals that this is the same operation as {@link Observable#buffer(int)}.
+ * How many produced values need to be skipped before starting a new buffer. Note that when `skip` and
+ * `count` are equals that this is the same operation as `buffer(int)`.
* @return
- * An {@link Observable} which produces buffers every "skipped" values containing at most
- * "count" produced values.
+ * An [[Observable]] which produces buffers every `skip` values containing at most
+ * `count` produced values.
*/
def buffer(count: Int, skip: Int): Observable[Seq[T]] = {
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(count, skip)
@@ -314,15 +367,15 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values.
*
- * This Observable produces connected non-overlapping buffers, each of a fixed duration
- * specified by the "timespan" argument. When the source Observable completes or encounters
+ * This Observable produces connected non-overlapping buffers, each of a fixed duration
+ * specified by the `timespan` argument. When the source Observable completes or encounters
* an error, the current buffer is emitted and the event is propagated.
*
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration.
+ * An [[Observable]] which produces connected non-overlapping buffers with a fixed duration.
*/
def buffer(timespan: Duration): Observable[Seq[T]] = {
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(timespan.length, timespan.unit)
@@ -332,17 +385,17 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values.
*
- * This Observable produces connected non-overlapping buffers, each of a fixed duration
- * specified by the "timespan" argument. When the source Observable completes or encounters
+ * This Observable produces connected non-overlapping buffers, each of a fixed duration
+ * specified by the `timespan` argument. When the source Observable completes or encounters
* an error, the current buffer is emitted and the event is propagated.
*
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a buffer.
+ * The [[Scheduler]] to use when determining the end and start of a buffer.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration.
+ * An [[Observable]] which produces connected non-overlapping buffers with a fixed duration.
*/
def buffer(timespan: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(timespan.length, timespan.unit, scheduler)
@@ -351,8 +404,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
- * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size
- * specified by the "count" argument (which ever is reached first). When the source Observable completes
+ * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size
+ * specified by the `count` argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param timespan
@@ -361,7 +414,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param count
* The maximum size of each buffer before it should be emitted.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers which are emitted after
+ * An [[Observable]] which produces connected non-overlapping buffers which are emitted after
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
*/
def buffer(timespan: Duration, count: Int): Observable[Seq[T]] = {
@@ -371,8 +424,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
- * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size
- * specified by the "count" argument (which ever is reached first). When the source Observable completes
+ * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size
+ * specified by the `count` argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param timespan
@@ -381,9 +434,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param count
* The maximum size of each buffer before it should be emitted.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a buffer.
+ * The [[Scheduler]] to use when determining the end and start of a buffer.
* @return
- * An {@link Observable} which produces connected non-overlapping buffers which are emitted after
+ * An [[Observable]] which produces connected non-overlapping buffers which are emitted after
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
*/
def buffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = {
@@ -393,8 +446,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
- * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan
- * specified by the "timespan" argument. When the source Observable completes or encounters an error, the
+ * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan
+ * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
* current buffer is emitted and the event is propagated.
*
* @param timespan
@@ -402,7 +455,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param timeshift
* The period of time after which a new buffer will be created.
* @return
- * An {@link Observable} which produces new buffers periodically, and these are emitted after
+ * An [[Observable]] which produces new buffers periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def buffer(timespan: Duration, timeshift: Duration): Observable[Seq[T]] = {
@@ -415,8 +468,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
- * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan
- * specified by the "timespan" argument. When the source Observable completes or encounters an error, the
+ * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan
+ * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
* current buffer is emitted and the event is propagated.
*
* @param timespan
@@ -424,9 +477,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param timeshift
* The period of time after which a new buffer will be created.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a buffer.
+ * The [[Scheduler]] to use when determining the end and start of a buffer.
* @return
- * An {@link Observable} which produces new buffers periodically, and these are emitted after
+ * An [[Observable]] which produces new buffers periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def buffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
@@ -440,19 +493,20 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows. The current window is emitted and replaced with a new window when the
- * Observable produced by the specified {@link Func0} produces a {@link rx.util.Closing} object. The {@link Func0} will then be used to create a new Observable to listen for the end of the next
+ * Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object.
+ * The function will then be used to create a new Observable to listen for the end of the next
* window.
*
- * @param closingSelector
- * The {@link Func0} which is used to produce an {@link Observable} for every window created.
- * When this {@link Observable} produces a {@link rx.util.Closing} object, the associated window
+ * @param closings
+ * The function which is used to produce an [[Observable]] for every window created.
+ * When this [[Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* is emitted and replaced with a new one.
* @return
- * An {@link Observable} which produces connected non-overlapping windows, which are emitted
- * when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
+ * An [[Observable]] which produces connected non-overlapping windows, which are emitted
+ * when the current [[Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
*/
- def window(closingSelector: () => Observable[Closing]): Observable[Observable[T]] = {
- val func : Func0[_ <: rx.Observable[_ <: Closing]] = closingSelector().asJava
+ def window(closings: () => Observable[Closing]): Observable[Observable[T]] = {
+ val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJava
val o1: rx.Observable[_ <: rx.Observable[_]] = asJava.window(func)
val o2 = new Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
@@ -463,36 +517,36 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces windows.
- * Chunks are created when the specified "windowOpenings" Observable produces a {@link rx.util.Opening} object.
- * Additionally the {@link Func0} argument is used to create an Observable which produces {@link rx.util.Closing} objects. When this Observable produces such an object, the associated window is
- * emitted.
+ * Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object.
+ * Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects.
+ * When this Observable produces such an object, the associated window is emitted.
*
- * @param windowOpenings
- * The {@link Observable} which when it produces a {@link rx.util.Opening} object, will cause
+ * @param openings
+ * The [[Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* another window to be created.
- * @param closingSelector
- * The {@link Func0} which is used to produce an {@link Observable} for every window created.
- * When this {@link Observable} produces a {@link rx.util.Closing} object, the associated window
+ * @param closings
+ * The function which is used to produce an [[Observable]] for every window created.
+ * When this [[Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* is emitted.
* @return
- * An {@link Observable} which produces windows which are created and emitted when the specified {@link Observable}s publish certain objects.
+ * An [[Observable]] which produces windows which are created and emitted when the specified [[Observable]]s publish certain objects.
*/
- def window(windowOpenings: Observable[Opening], closingSelector: Opening => Observable[Closing]) = {
+ def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
Observable.jObsOfJObsToScObsOfScObs(
- asJava.window(windowOpenings.asJava, (op: Opening) => closingSelector(op).asJava))
+ asJava.window(openings.asJava, (op: Opening) => closings(op).asJava))
: Observable[Observable[T]] // SI-7818
}
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
- * non-overlapping windows, each containing "count" elements. When the source Observable completes or
+ * non-overlapping windows, each containing `count` elements. When the source Observable completes or
* encounters an error, the current window is emitted, and the event is propagated.
*
* @param count
* The maximum size of each window before it should be emitted.
* @return
- * An {@link Observable} which produces connected non-overlapping windows containing at most
- * "count" produced values.
+ * An [[Observable]] which produces connected non-overlapping windows containing at most
+ * `count` produced values.
*/
def window(count: Int): Observable[Observable[T]] = {
// this unnecessary ascription is needed because of this bug (without, compiler crashes):
@@ -502,17 +556,17 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces windows every
- * "skip" values, each containing "count" elements. When the source Observable completes or encounters an error,
+ * `skip` values, each containing `count` elements. When the source Observable completes or encounters an error,
* the current window is emitted and the event is propagated.
*
* @param count
* The maximum size of each window before it should be emitted.
* @param skip
- * How many produced values need to be skipped before starting a new window. Note that when "skip" and
- * "count" are equals that this is the same operation as {@link Observable#window(Observable, int)}.
+ * How many produced values need to be skipped before starting a new window. Note that when `skip` and
+ * `count` are equal that this is the same operation as `window(int)`.
* @return
- * An {@link Observable} which produces windows every "skipped" values containing at most
- * "count" produced values.
+ * An [[Observable]] which produces windows every `skip` values containing at most
+ * `count` produced values.
*/
def window(count: Int, skip: Int): Observable[Observable[T]] = {
Observable.jObsOfJObsToScObsOfScObs(asJava.window(count, skip))
@@ -521,14 +575,14 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
- * non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
+ * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
*
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
* @return
- * An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
+ * An [[Observable]] which produces connected non-overlapping windows with a fixed duration.
*/
def window(timespan: Duration): Observable[Observable[T]] = {
Observable.jObsOfJObsToScObsOfScObs(asJava.window(timespan.length, timespan.unit))
@@ -537,16 +591,16 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
- * non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
+ * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
*
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a window.
+ * The [[Scheduler]] to use when determining the end and start of a window.
* @return
- * An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
+ * An [[Observable]] which produces connected non-overlapping windows with a fixed duration.
*/
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
Observable.jObsOfJObsToScObsOfScObs(asJava.window(timespan.length, timespan.unit, scheduler))
@@ -555,8 +609,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
- * non-overlapping windows, each of a fixed duration specified by the "timespan" argument or a maximum size
- * specified by the "count" argument (which ever is reached first). When the source Observable completes
+ * non-overlapping windows, each of a fixed duration specified by the `timespan` argument or a maximum size
+ * specified by the `count` argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current window is emitted and the event is propagated.
*
* @param timespan
@@ -565,7 +619,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param count
* The maximum size of each window before it should be emitted.
* @return
- * An {@link Observable} which produces connected non-overlapping windows which are emitted after
+ * An [[Observable]] which produces connected non-overlapping windows which are emitted after
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
*/
def window(timespan: Duration, count: Int): Observable[Observable[T]] = {
@@ -575,8 +629,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
- * non-overlapping windows, each of a fixed duration specified by the "timespan" argument or a maximum size
- * specified by the "count" argument (which ever is reached first). When the source Observable completes
+ * non-overlapping windows, each of a fixed duration specified by the `timespan` argument or a maximum size
+ * specified by the `count` argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current window is emitted and the event is propagated.
*
* @param timespan
@@ -585,9 +639,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param count
* The maximum size of each window before it should be emitted.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a window.
+ * The [[Scheduler]] to use when determining the end and start of a window.
* @return
- * An {@link Observable} which produces connected non-overlapping windows which are emitted after
+ * An [[Observable]] which produces connected non-overlapping windows which are emitted after
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
*/
def window(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
@@ -597,8 +651,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable starts a new window
- * periodically, which is determined by the "timeshift" argument. Each window is emitted after a fixed timespan
- * specified by the "timespan" argument. When the source Observable completes or encounters an error, the
+ * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan
+ * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
* current window is emitted and the event is propagated.
*
* @param timespan
@@ -606,7 +660,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param timeshift
* The period of time after which a new window will be created.
* @return
- * An {@link Observable} which produces new windows periodically, and these are emitted after
+ * An [[Observable]] which produces new windows periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def window(timespan: Duration, timeshift: Duration): Observable[Observable[T]] = {
@@ -619,8 +673,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Creates an Observable which produces windows of collected values. This Observable starts a new window
- * periodically, which is determined by the "timeshift" argument. Each window is emitted after a fixed timespan
- * specified by the "timespan" argument. When the source Observable completes or encounters an error, the
+ * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan
+ * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
* current window is emitted and the event is propagated.
*
* @param timespan
@@ -628,9 +682,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @param timeshift
* The period of time after which a new window will be created.
* @param scheduler
- * The {@link Scheduler} to use when determining the end and start of a window.
+ * The [[Scheduler]] to use when determining the end and start of a window.
* @return
- * An {@link Observable} which produces new windows periodically, and these are emitted after
+ * An [[Observable]] which produces new windows periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
def window(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
@@ -642,27 +696,27 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- *
+ * Returns an Observable which only emits those items for which a given predicate holds.
+ *
*
+ * Registers an function to be called when this Observable invokes [[Observer.onCompleted onCompleted]] or [[Observer.onError onError]].
+ *
*
+ *
*
- * Note: {@code mapMany} and {@code flatMap} are equivalent.
*
* @param func
* a function that, when applied to an item emitted by the source Observable, returns
@@ -683,7 +735,6 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* @return an Observable that emits the result of applying the transformation function to each
* item emitted by the source Observable and merging the results of the Observables
* obtained from this transformation.
- * @see #mapMany(Func1)
*/
def flatMap[R](f: T => Observable[R]): Observable[R] = {
Observable[R](asJava.flatMap[R]((t: T) => f(t).asJava))
@@ -692,7 +743,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable and emits the result.
- *
+ *
*
+ * Turns all of the notifications from a source Observable into [[Observer.onNext onNext]] emissions, and marks them with their original notification types within [[Notification]] objects.
+ *
*
+ * Asynchronously subscribes and unsubscribes Observers on the specified [[Scheduler]].
+ *
*
+ * Asynchronously notify [[Observer]]s on the specified [[Scheduler]].
+ *
*
+ *
+ * This operation is only available if `this` is of type `Observable[Notification[U]]` for some `U`,
+ * otherwise you will get a compilation error.
+ *
*
+ * Instruct an Observable to pass control to another Observable rather than invoking [[Observer.onError onError]] if it encounters an error.
+ *
*
+ *
* By default, when an Observable encounters an error that prevents it from emitting the
- * expected item to its {@link Observer}, the Observable invokes its Observer's
- *
+ *
* You can use this to prevent errors from propagating or to supply fallback data should errors
* be encountered.
*
@@ -791,21 +851,22 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error.
- *
+ * Instruct an Observable to pass control to another Observable rather than invoking [[Observer.onError onError]] if it encounters an error.
+ *
*
+ *
* By default, when an Observable encounters an error that prevents it from emitting the
- * expected item to its {@link Observer}, the Observable invokes its Observer's
- *
+ *
* You can use this to prevent errors from propagating or to supply fallback data should errors
* be encountered.
*
@@ -821,23 +882,24 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error of type {@link java.lang.Exception}.
- *
- * This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through.
- *
+ * Instruct an Observable to pass control to another Observable rather than invoking [[Observer.onError onError]] if it encounters an error of type `java.lang.Exception`.
+ *
+ * This differs from `Observable.onErrorResumeNext` in that this one does not handle `java.lang.Throwable` or `java.lang.Error` but lets those continue through.
+ *
*
+ *
* By default, when an Observable encounters an error that prevents it from emitting the
- * expected item to its {@link Observer}, the Observable invokes its Observer's
- *
+ *
* You can use this to prevent errors from propagating or to supply fallback data should errors
* be encountered.
*
@@ -854,19 +916,19 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Instruct an Observable to emit an item (returned by a specified function) rather than
- * invoking {@link Observer#onError onError} if it encounters an error.
- *
+ * invoking [[Observer.onError onError]] if it encounters an error.
+ *
*
+ *
* By default, when an Observable encounters an error that prevents it from emitting the
- * expected item to its {@link Observer}, the Observable invokes its Observer's
- *
+ * `onError` method, it will instead pass the return value of
+ * `resumeFunction` to the Observer's [[Observer.onNext onNext]] method.
+ *
* You can use this to prevent errors from propagating or to supply fallback data should errors
* be encountered.
*
@@ -887,20 +949,18 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* by the source Observable into the same function, and so on until all items have been emitted
* by the source Observable, and emits the final result from the final call to your function as
* its sole item.
- *
+ *
*
+ *
* This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold,"
* "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance,
- * has an
+ * Returns a pair of a start function and an [[Observable]] that shares a single subscription to the underlying
+ * Observable that will replay all of its items and notifications to any future [[Observer]].
+ *
*
+ * This method has similar behavior to [[Observable.replay]] except that this auto-subscribes to
+ * the source Observable rather than returning a start function and an Observable.
+ *
*
+ *
* This is useful when you want an Observable to cache responses and you can't control the
- * subscribe/unsubscribe behavior of all the {@link Observer}s.
- *
+ * subscribe/unsubscribe behavior of all the [[Observer]]s.
+ *
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use the
- *
+ * Returns a a pair of a start function and an [[Observable]], which waits until the start function is called before it begins emitting
+ * items to those [[Observer]]s that have subscribed to it.
+ *
*
+ *
*
+ *
* This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold,"
* "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance,
- * has an
+ *
*
+ *
*
+ *
*
+ *
* This sort of function is sometimes called an accumulator.
- *
- * Note that when you pass a seed to
+ *
*
+ *
*
- * You can ignore the first
+ *
*
+ *
*
- * This method returns an Observable that will invoke a subscribing {@link Observer}'s {@link Observer#onNext onNext} function a maximum of
+ *
*
- *
+ *
*
+ * `other` Observable emits an item.
+ *
*
+ *
*
- * Normally, an Observable that returns multiple items will do so by invoking its {@link Observer}'s {@link Observer#onNext onNext} method for each such item. You can change
+ *
+ * Normally, an Observable that returns multiple items will do so by invoking its [[Observer]]'s
+ * [[Observer.onNext onNext]] method for each such item. You can change
* this behavior, instructing the Observable to compose a list of all of these items and then to
- * invoke the Observer's
+ * invoke the Observer's `onNext` function once, passing it the entire list, by
+ * calling the Observable's `toList` method prior to calling its `Observable.subscribe` method.
+ *
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
@@ -1210,10 +1249,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
*
* @param f
* a function that extracts the key from an item
- * @param
+ *
*
+ *
*
+ *
* You can combine items emitted by two Observables so that they act like a single
- * Observable by using the {@code merge} method.
+ * Observable by using the `merge` method.
*
* @param that
* an Observable to be merged
- * @return an Observable that emits items from {@code this} and {@code that} until
- * {@code this} or {@code that} emits {@code onError} or {@code onComplete}.
+ * @return an Observable that emits items from `this` and `that` until
+ * `this` or `that` emits `onError` or `onComplete`.
*/
def merge[U >: T](that: Observable[U]): Observable[U] = {
val thisJava: rx.Observable[_ <: U] = this.asJava
@@ -1261,39 +1306,45 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * This behaves like {@link #merge(Observable)} except that if any of the merged Observables
- * notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
+ * This behaves like [[Observable.merge]] except that if any of the merged Observables
+ * notify of an error via [[Observer.onError onError]], `mergeDelayError` will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
- *
+ *
*
- * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its
+ *
+ * Even if multiple merged Observables send `onError` notifications, `mergeDelayError` will only invoke the `onError` method of its
* Observers once.
- *
+ *
* This method allows an Observer to receive all successfully emitted items from all of the
* source Observables without being interrupted by an error notification from one of them.
*
* @param that
* an Observable to be merged
* @return an Observable that emits items that are the result of flattening the items emitted by
- * {$code this} and {$code that}
+ * `this` and `that`
*/
def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = {
Observable[U](rx.Observable.mergeDelayError[U](this.asJava, that.asJava))
}
/**
- * Flattens the sequence of Observables emitted by {@code this} into one Observable, without any
+ * Flattens the sequence of Observables emitted by `this` into one Observable, without any
* transformation.
- *
+ *
*
+ *
* You can combine the items emitted by multiple Observables so that they act like a single
* Observable by using this method.
*
+ * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
+ * otherwise you'll get a compilation error.
+ *
* @return an Observable that emits items that are the result of flattening the items emitted
- * by the Observables emitted by {@code this}
+ * by the Observables emitted by `this`
+ *
+ * @usecase def flatten[U]: Observable[U]
+ * @inheritdoc
*/
def flatten[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -1304,21 +1355,27 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * This behaves like {@link #flatten(<:<)} except that if any of the merged Observables
- * notify of an error via {@link Observer#onError onError}, this method will
+ * This behaves like `flatten` except that if any of the merged Observables
+ * notify of an error via [[Observer.onError onError]], this method will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
- *
+ *
*
- * Even if multiple merged Observables send {@code onError} notifications, this method will only invoke the {@code onError} method of its
+ *
+ * Even if multiple merged Observables send `onError` notifications, this method will only invoke the `onError` method of its
* Observers once.
- *
+ *
* This method allows an Observer to receive all successfully emitted items from all of the
* source Observables without being interrupted by an error notification from one of them.
+ *
+ * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
+ * otherwise you'll get a compilation error.
*
* @return an Observable that emits items that are the result of flattening the items emitted by
* the Observables emitted by the this Observable
+ *
+ * @usecase def flattenDelayError[U]: Observable[U]
+ * @inheritdoc
*/
def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -1344,24 +1401,18 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
- *
+ *
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
- *
+ *
*
- * Information on debounce vs throttle:
- *
- *
+ *
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
- *
+ *
*
- * Information on debounce vs throttle:
- *
- *
+ *
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
- *
+ *
*
- * Information on debounce vs throttle:
- *
- *
+ *
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
- *
+ *
*
- * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
- *
+ *
+ * This differs from `Observable.throttleLast` in that this only tracks passage of time whereas `Observable.throttleLast` ticks at scheduled intervals.
+ *
*
- * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
- *
+ *
+ * This differs from `Observable.throttleLast` in that this only tracks passage of time whereas `Observable.throttleLast` ticks at scheduled intervals.
+ *
*
- * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
- *
+ *
+ * This differs from `Observable.throttleFirst` in that this ticks along at a scheduled interval whereas `Observable.throttleFirst` does not tick, it just tracks passage of time.
+ *
*
- * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
- *
+ *
+ * This differs from `Observable.throttleFirst` in that this ticks along at a scheduled interval whereas `Observable.throttleFirst` does not tick, it just tracks passage of time.
+ *
*
+ *
*
+ * This is just a shorthand for `take(1)`.
+ *
*
+ *
*
+ *
*
- *
- *
+ *
*
- *
+ *
*
- *
+ *
*
+ *
*
- * If {@link Observer#onError} is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
- *
- * Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
- *
+ *
+ * If [[Observer.onError]] is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
+ *
+ * Any [[Observer.onNext]] calls received on each attempt will be emitted and concatenated together.
+ *
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
*
@@ -1701,13 +1730,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Retry subscription to origin Observable whenever onError is called (infinite retry count).
- *
+ *
*
- * If {@link Observer#onError} is invoked the source Observable will be re-subscribed to.
- *
- * Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
- *
+ *
+ * If [[Observer.onError]] is invoked the source Observable will be re-subscribed to.
+ *
+ * Any [[Observer.onNext]] calls received on each attempt will be emitted and concatenated together.
+ *
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
* @return Observable with retry logic.
@@ -1717,7 +1746,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
+ * Converts an Observable into a [[rx.lang.scala.observables.BlockingObservable]] (an Observable with blocking
* operators).
*
* @see Blocking Observable Operators
@@ -1727,11 +1756,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Perform work in parallel by sharding an {@code Observable
+ * Creates an Observable that will execute the given function when an [[Observer]] subscribes to it.
+ *
*
- * Write the function you pass to
- * A well-formed Observable must invoke either the Observer's
+ *
+ * A well-formed Observable must invoke either the Observer's `onCompleted` method
+ * exactly once or its `onError` method exactly once.
+ *
* See Rx Design Guidelines (PDF)
* for detailed information.
*
- * @param
+ * Returns an Observable that invokes an [[Observer]]'s [[Observer.onError onError]] method when the Observer subscribes to it
+ *
*
+ *
* Implementation note: the entire array will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
+ * Implementation note: the entire array will be immediately emitted each time an [[Observer]] subscribes.
+ * Since this occurs before the [[Subscription]] is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param items
* the source Array
- * @param
*
+ *
* The defer operator allows you to defer or delay emitting items from an Observable until such
- * time as an Observer subscribes to the Observable. This allows an {@link Observer} to easily
+ * time as an Observer subscribes to the Observable. This allows an [[Observer]] to easily
* obtain updates or a refreshed version of the sequence.
*
* @param observableFactory
- * the Observable factory function to invoke for each {@link Observer} that
+ * the Observable factory function to invoke for each [[Observer]] that
* subscribes to the resulting Observable
- * @param
- *
- * To convert any object into an Observable that emits that object, pass that object into the
- *
- * This is similar to the {@link #apply(Iterable[T])} method, except that
- * {@link #apply(Iterable[T])} will convert an {@link Iterable} object into an Observable that emits
- * each of the items in the Iterable, one at a time, while the
+ * Returns an Observable that never sends any items or notifications to an [[Observer]].
+ *
*
+ *
* This Observable is useful primarily for testing purposes.
*
- * @return an Observable that never sends any items or notifications to an {@link Observer}
+ * @return an Observable that never sends any items or notifications to an [[Observer]]
*/
def never: Observable[Nothing] = {
Observable[Nothing](JObservable.never())
}
- // TODO also support Scala Futures, but think well before. Do we want to Future and Observable
- // to share a common base interface?
-
- // private because it's not RxScala's responsability to provide this alias
- private type Future[+T] = java.util.concurrent.Future[_ <: T]
-
- def apply[T](f: Future[T]): Observable[T] = {
- Observable[T](rx.Observable.from(f))
- }
-
- def apply[T](f: Future[T], scheduler: Scheduler): Observable[T] = {
- Observable[T](rx.Observable.from(f, scheduler))
- }
-
- def apply[T](f: Future[T], duration: Duration): Observable[T] = {
- Observable[T](rx.Observable.from(f, duration.length, duration.unit))
- }
-
/**
- * Given a Seq of N observables, returns an observable that emits Seqs of N elements each.
- * The first emitted Seq will contain the first element of each source observable,
- * the second Seq the second element of each source observable, and so on.
+ * Given 3 observables, returns an observable that emits Tuples of 3 elements each.
+ * The first emitted Tuple will contain the first element of each source observable,
+ * the second Tuple the second element of each source observable, and so on.
*
- * @param observables
- * A Seq of source Observables
- * @return an Observable that emits the zipped Seqs
+ * @return an Observable that emits the zipped Observables
*/
- def zip[T](observables: Seq[Observable[T]]): Observable[Seq[T]] = {
- val f: FuncN[Seq[T]] = (args: Seq[java.lang.Object]) => {
- val asSeq: Seq[Object] = args.toSeq
- asSeq.asInstanceOf[Seq[T]]
- }
- val list = observables.map(_.asJava).asJava
- val o = rx.Observable.zip(list, f)
- Observable[Seq[T]](o)
+ def zip[A, B, C](obA: Observable[A], obB: Observable[B], obC: Observable[C]): Observable[(A, B, C)] = {
+ Observable[(A, B, C)](rx.Observable.zip[A, B, C, (A, B, C)](obA.asJava, obB.asJava, obC.asJava, (a: A, b: B, c: C) => (a, b, c)))
}
/**
- * Given an Observable emitting N source observables, returns an observable that emits Seqs of N elements each.
+ * Given 4 observables, returns an observable that emits Tuples of 4 elements each.
+ * The first emitted Tuple will contain the first element of each source observable,
+ * the second Tuple the second element of each source observable, and so on.
+ *
+ * @return an Observable that emits the zipped Observables
+ */
+ def zip[A, B, C, D](obA: Observable[A], obB: Observable[B], obC: Observable[C], obD: Observable[D]): Observable[(A, B, C, D)] = {
+ Observable[(A, B, C, D)](rx.Observable.zip[A, B, C, D, (A, B, C, D)](obA.asJava, obB.asJava, obC.asJava, obD.asJava, (a: A, b: B, c: C, d: D) => (a, b, c, d)))
+ }
+
+ /**
+ * Given an Observable emitting `N` source observables, returns an observable that
+ * emits Seqs of `N` elements each.
* The first emitted Seq will contain the first element of each source observable,
* the second Seq the second element of each source observable, and so on.
*
+ * Note that the returned Observable will only start emitting items once the given
+ * `Observable[Observable[T]]` has completed, because otherwise it cannot know `N`.
+ *
* @param observables
* An Observable emitting N source Observables
* @return an Observable that emits the zipped Seqs
@@ -1967,11 +2001,31 @@ object Observable {
val o = rx.Observable.zip(list, f)
Observable[Seq[T]](o)
}
-
+
+ /**
+ * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers.
+ *
+ *
+ *
* NOTE: This will block even if the Observable is asynchronous.
- *
+ *
* This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does
* not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods.
- *
+ *
* Observable<T>
instance is responsible for accepting all subscriptions
- * and notifying all Observers. Unless the documentation for a particular
- * Observable<T>
implementation indicates otherwise, Observers should make no
- * assumptions about the order in which multiple Observers will receive their notifications.
- *
*
* @param that
@@ -154,13 +209,14 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Returns an Observable that emits the items emitted by two or more Observables, one after the
+ * Returns an Observable that emits the items emitted by several Observables, one after the
* other.
- *
- *
- * @return an Observable that emits items that are the result of combining the items emitted by
- * the source Observables, one after the other
+ *
+ * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
+ * otherwise you'll get a compilation error.
+ *
+ * @usecase def concat[U]: Observable[U]
+ * @inheritdoc
*/
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -173,47 +229,44 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Wraps this Observable in another Observable that ensures that the resulting
* Observable is chronologically well-behaved.
- *
- *
*
* @return an Observable that emits timestamped items from the source Observable
*/
- def timestamp: Observable[Timestamped[T]] = {
- Observable[Timestamped[T]](asJava.timestamp())
+ def timestamp: Observable[(Long, T)] = {
+ Observable[rx.util.Timestamped[_ <: T]](asJava.timestamp())
+ .map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue()))
}
/**
* Returns an Observable formed from this Observable and another Observable by combining
* corresponding elements in pairs.
- * The number of {@code onNext} invocations of the resulting {@code Observable[(T, U)]}
- * is the minumum of the number of {@code onNext} invocations of {@code this} and {@code that}.
+ * The number of `onNext` invocations of the resulting `Observable[(T, U)]`
+ * is the minumum of the number of `onNext` invocations of `this` and `that`.
*/
def zip[U](that: Observable[U]): Observable[(T, U)] = {
Observable[(T, U)](JObservable.zip[T, U, (T, U)](this.asJava, that.asJava, (t: T, u: U) => (t, u)))
}
-
- // public static
*
* @param predicate
- * a function that evaluates the items emitted by the source Observable, returning {@code true} if they pass the filter
+ * a function that evaluates the items emitted by the source Observable, returning `true` if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter
- * evaluates as {@code true}
+ * evaluates as `true`
*/
def filter(predicate: T => Boolean): Observable[T] = {
Observable[T](asJava.filter(predicate))
}
/**
- * Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
- *
*
* @param action
- * an {@link Action0} to be invoked when the source Observable finishes
- * @return an Observable that emits the same items as the source Observable, then invokes the {@link Action0}
- * @see MSDN: Observable.Finally Method
+ * an function to be invoked when the source Observable finishes
+ * @return an Observable that emits the same items as the source Observable, then invokes the function
*/
def finallyDo(action: () => Unit): Observable[T] = {
Observable[T](asJava.finallyDo(action))
@@ -672,10 +726,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
* resulting Observables and emitting the results of this merger.
- *
- *
*
* @param func
@@ -705,77 +756,86 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Turns all of the notifications from a source Observable into {@link Observer#onNext onNext} emissions, and marks them with their original notification types within {@link Notification} objects.
- *
*
* @return an Observable whose items are the result of materializing the items and
* notifications of the source Observable
- * @see MSDN: Observable.materialize
*/
def materialize: Observable[Notification[T]] = {
- Observable[Notification[T]](asJava.materialize())
+ Observable[rx.Notification[_ <: T]](asJava.materialize()).map(Notification(_))
}
/**
- * Asynchronously subscribes and unsubscribes Observers on the specified {@link Scheduler}.
- *
*
* @param scheduler
- * the {@link Scheduler} to perform subscription and unsubscription actions on
+ * the [[Scheduler]] to perform subscription and unsubscription actions on
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
- * on the specified {@link Scheduler}
+ * on the specified [[Scheduler]]
*/
def subscribeOn(scheduler: Scheduler): Observable[T] = {
Observable[T](asJava.subscribeOn(scheduler))
}
/**
- * Asynchronously notify {@link Observer}s on the specified {@link Scheduler}.
- *
*
* @param scheduler
- * the {@link Scheduler} to notify {@link Observer}s on
- * @return the source Observable modified so that its {@link Observer}s are notified on the
- * specified {@link Scheduler}
+ * the [[Scheduler]] to notify [[Observer]]s on
+ * @return the source Observable modified so that its [[Observer]]s are notified on the
+ * specified [[Scheduler]]
*/
def observeOn(scheduler: Scheduler): Observable[T] = {
Observable[T](asJava.observeOn(scheduler))
}
/**
- * Returns an Observable that reverses the effect of {@link #materialize materialize} by
- * transforming the {@link Notification} objects emitted by the source Observable into the items
+ * Returns an Observable that reverses the effect of [[Observable.materialize]] by
+ * transforming the [[Notification]] objects emitted by the source Observable into the items
* or notifications they represent.
- *
*
- * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects emitted by the source Observable
+ * @return an Observable that emits the items and notifications embedded in the [[Notification]] objects emitted by the source Observable
+ *
+ * @usecase def dematerialize[U]: Observable[U]
+ * @inheritdoc
+ *
*/
// with =:= it does not work, why?
- def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U] = {
- val o = asJava.dematerialize[U]()
- Observable[U](o)
+ def dematerialize[U](implicit evidence: Observable[T] <:< Observable[Notification[U]]): Observable[U] = {
+ val o1: Observable[Notification[U]] = this
+ val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJava)
+ val o3 = o2.asJava.dematerialize[U]()
+ Observable[U](o3)
}
/**
- * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error.
- *
- *
onError
method, and then quits without invoking any more of its Observer's
- * methods. The onErrorResumeNext
method changes this behavior. If you pass a
- * function that returns an Observable (resumeFunction
) to
- * onErrorResumeNext
, if the original Observable encounters an error, instead of
- * invoking its Observer's onError
method, it will instead relinquish control to
- * the Observable returned from resumeFunction
, which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no
- * Observable necessarily invokes onError
, the Observer may never know that an
+ * expected item to its [[Observer]], the Observable invokes its Observer's
+ * `onError` method, and then quits without invoking any more of its Observer's
+ * methods. The `onErrorResumeNext` method changes this behavior. If you pass a
+ * function that returns an Observable (`resumeFunction`) to
+ * `onErrorResumeNext`, if the original Observable encounters an error, instead of
+ * invoking its Observer's `onError` method, it will instead relinquish control to
+ * the Observable returned from `resumeFunction`, which will invoke the Observer's
+ * [[Observer.onNext onNext]] method if it is able to do so. In such a case, because no
+ * Observable necessarily invokes `onError`, the Observer may never know that an
* error happened.
- *
- *
onError
method, and then quits without invoking any more of its Observer's
- * methods. The onErrorResumeNext
method changes this behavior. If you pass
- * another Observable (resumeSequence
) to an Observable's
- * onErrorResumeNext
method, if the original Observable encounters an error,
- * instead of invoking its Observer's onError
method, it will instead relinquish
- * control to resumeSequence
which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no
- * Observable necessarily invokes onError
, the Observer may never know that an
+ * expected item to its [[Observer]], the Observable invokes its Observer's
+ * `onError` method, and then quits without invoking any more of its Observer's
+ * methods. The `onErrorResumeNext` method changes this behavior. If you pass
+ * another Observable (`resumeSequence`) to an Observable's
+ * `onErrorResumeNext` method, if the original Observable encounters an error,
+ * instead of invoking its Observer's `onError` method, it will instead relinquish
+ * control to `resumeSequence` which will invoke the Observer's [[Observer.onNext onNext]]
+ * method if it is able to do so. In such a case, because no
+ * Observable necessarily invokes `onError`, the Observer may never know that an
* error happened.
- *
- *
onError
method, and then quits without invoking any more of its Observer's
- * methods. The onErrorResumeNext
method changes this behavior. If you pass
- * another Observable (resumeSequence
) to an Observable's
- * onErrorResumeNext
method, if the original Observable encounters an error,
- * instead of invoking its Observer's onError
method, it will instead relinquish
- * control to resumeSequence
which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no
- * Observable necessarily invokes onError
, the Observer may never know that an
+ * expected item to its [[Observer]], the Observable invokes its Observer's
+ * `onError` method, and then quits without invoking any more of its Observer's
+ * methods. The `onErrorResumeNext` method changes this behavior. If you pass
+ * another Observable (`resumeSequence`) to an Observable's
+ * `onErrorResumeNext` method, if the original Observable encounters an error,
+ * instead of invoking its Observer's `onError` method, it will instead relinquish
+ * control to `resumeSequence` which will invoke the Observer's [[Observer.onNext onNext]]
+ * method if it is able to do so. In such a case, because no
+ * Observable necessarily invokes `onError`, the Observer may never know that an
* error happened.
- *
- *
onError
method, and then quits without invoking any more of its Observer's
- * methods. The onErrorReturn
method changes this behavior. If you pass a function
- * (resumeFunction
) to an Observable's onErrorReturn
method, if the
+ * expected item to its [[Observer]], the Observable invokes its Observer's
+ * `onError` method, and then quits without invoking any more of its Observer's
+ * methods. The `onErrorReturn` method changes this behavior. If you pass a function
+ * (`resumeFunction`) to an Observable's `onErrorReturn` method, if the
* original Observable encounters an error, instead of invoking its Observer's
- * onError
method, it will instead pass the return value of
- * resumeFunction
to the Observer's {@link Observer#onNext onNext} method.
- *
- *
inject
method that does a similar operation on lists.
+ * has an `inject` method that does a similar operation on lists.
*
* @param accumulator
* An accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the
* output from the source Observable
- * @see MSDN: Observable.Aggregate
- * @see Wikipedia: Fold (higher-order function)
*/
def reduce[U >: T](f: (U, U) => U): Observable[U] = {
val func: Func2[_ >: U, _ >: U, _ <: U] = f
@@ -909,13 +969,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
- * Observable that will replay all of its items and notifications to any future {@link Observer}.
- *
*
- * @return a pair of a start function and an {@link Observable} such that when the start function
- * is called, the Observable starts to emit items to its {@link Observer}s
+ * @return a pair of a start function and an [[Observable]] such that when the start function
+ * is called, the Observable starts to emit items to its [[Observer]]s
*/
def replay: (() => Subscription, Observable[T]) = {
val javaCO = asJava.replay()
@@ -923,16 +983,16 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * This method has similar behavior to {@link #replay} except that this auto-subscribes to
- * the source Observable rather than returning a {@link ConnectableObservable}.
- *
- *
cache()
operator so be careful not to use this operator on Observables that
+ * `cache()` operator so be careful not to use this operator on Observables that
* emit an infinite or very large number of items that will use up memory.
*
* @return an Observable that when first subscribed to, caches all of its notifications for
@@ -943,13 +1003,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting
- * items to those {@link Observer}s that have subscribed to it.
- *
*
- * @return a pair of a start function and an {@link Observable} such that when the start function
- * is called, the Observable starts to emit items to its {@link Observer}s
+ * @return a pair of a start function and an [[Observable]] such that when the start function
+ * is called, the Observable starts to emit items to its [[Observer]]s
*/
def publish: (() => Subscription, Observable[T]) = {
val javaCO = asJava.publish()
@@ -964,12 +1024,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* item.
- *
- *
inject
method that does a similar operation on lists.
+ * has an `inject` method that does a similar operation on lists.
*
* @param initialValue
* the initial (seed) accumulator value
@@ -978,23 +1038,21 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* Observable, the result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output
* from the items emitted by the source Observable
- * @see MSDN: Observable.Aggregate
- * @see Wikipedia: Fold (higher-order function)
*/
- def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
+ def foldLeft[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
Observable[R](asJava.reduce(initialValue, accumulator))
}
/**
* Returns an Observable that emits the results of sampling the items emitted by the source
* Observable at a specified time interval.
- *
*
* @param period
* the sampling rate
* @param unit
- * the {@link TimeUnit} in which
period
is defined
+ * the [[TimeUnit]] in which `period` is defined
* @return an Observable that emits the results of sampling the items emitted by the source
* Observable at the specified time interval
*/
@@ -1005,15 +1063,15 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that emits the results of sampling the items emitted by the source
* Observable at a specified time interval.
- *
*
* @param period
* the sampling rate
* @param unit
- * the {@link TimeUnit} in which
period
is defined
+ * the [[TimeUnit]] in which `period` is defined
* @param scheduler
- * the {@link Scheduler} to use when sampling
+ * the [[Scheduler]] to use when sampling
* @return an Observable that emits the results of sampling the items emitted by the source
* Observable at the specified time interval
*/
@@ -1026,21 +1084,20 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the result of each of these iterations.
- *
- *
scan()
the resulting Observable will emit
+ *
+ * Note that when you pass a seed to `scan()` the resulting Observable will emit
* that seed as its first emitted item.
*
* @param initialValue
* the initial (seed) accumulator value
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source
- * Observable, whose result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the next accumulator call.
+ * Observable, whose result will be emitted to [[Observer]]s via [[Observer.onNext onNext]] and used in the next accumulator call.
* @return an Observable that emits the results of each call to the accumulator function
- * @see MSDN: Observable.Scan
*/
def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
Observable[R](asJava.scan(initialValue, accumulator))
@@ -1049,34 +1106,31 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that emits a Boolean that indicates whether all of the items emitted by
* the source Observable satisfy a condition.
- *
*
* @param predicate
* a function that evaluates an item and returns a Boolean
- * @return an Observable that emits
true
if all items emitted by the source
- * Observable satisfy the predicate; otherwise, false
+ * @return an Observable that emits `true` if all items emitted by the source
+ * Observable satisfy the predicate; otherwise, `false`
*/
def forall(predicate: T => Boolean): Observable[Boolean] = {
// type mismatch; found : rx.Observable[java.lang.Boolean] required: rx.Observable[_ <: scala.Boolean]
// new Observable[Boolean](asJava.all(predicate))
// it's more fun in Scala:
- this.map(predicate).fold(true)(_ && _)
+ this.map(predicate).foldLeft(true)(_ && _)
}
/**
- * Returns an Observable that skips the first num
items emitted by the source
+ * Returns an Observable that skips the first `num` items emitted by the source
* Observable and emits the remainder.
- *
- *
num
items emitted by an Observable and attend only to
- * those items that come after, by modifying the Observable with the skip
method.
- *
+ *
* @param num
* the number of items to skip
* @return an Observable that is identical to the source Observable except that it does not
- * emit the first num
items that the source emits
+ * emit the first `num` items that the source emits
*/
def drop(n: Int): Observable[T] = {
Observable[T](asJava.skip(n))
@@ -1085,7 +1139,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that bypasses all items from the source Observable as long as the specified
* condition holds true. Emits all further source items as soon as the condition becomes false.
- *
*
* @param predicate
@@ -1098,19 +1152,20 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
}
/**
- * Returns an Observable that emits only the first
num
items emitted by the source
+ * Returns an Observable that emits only the first `num` items emitted by the source
* Observable.
- *
- *
num
times before invoking
- * {@link Observer#onCompleted onCompleted}.
+ *
+ * This method returns an Observable that will invoke a subscribing [[Observer]]'s
+ * [[Observer.onNext onNext]] function a maximum of `num` times before invoking
+ * [[Observer.onCompleted onCompleted]].
*
* @param num
* the number of items to take
- * @return an Observable that emits only the first num
items from the source
+ * @return an Observable that emits only the first `num` items from the source
* Observable, or all of the items from the source Observable if that Observable emits
- * fewer than num
items
+ * fewer than `num` items
*/
def take(n: Int): Observable[T] = {
Observable[T](asJava.take(n))
@@ -1119,46 +1174,29 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that emits items emitted by the source Observable so long as a
* specified condition is true.
- *
*
* @param predicate
* a function that evaluates an item emitted by the source Observable and returns a
* Boolean
* @return an Observable that emits the items from the source Observable so long as each item
- * satisfies the condition defined by
predicate
+ * satisfies the condition defined by `predicate`
*/
def takeWhile(predicate: T => Boolean): Observable[T] = {
Observable[T](asJava.takeWhile(predicate))
}
-
- /**
- * Returns an Observable that emits the items emitted by a source Observable so long as a given
- * predicate remains true, where the predicate can operate on both the item and its index
- * relative to the complete sequence.
- *
- *
- * @param predicate
- * a function to test each item emitted by the source Observable for a condition;
- * the second parameter of the function represents the index of the source item
- * @return an Observable that emits items from the source Observable so long as the predicate
- * continues to return
true
for each item, then completes
- */
- def takeWhileWithIndex(predicate: (T, Integer) => Boolean): Observable[T] = {
- Observable[T](asJava.takeWhileWithIndex(predicate))
- }
/**
- * Returns an Observable that emits only the last count
items emitted by the source
+ * Returns an Observable that emits only the last `count` items emitted by the source
* Observable.
- *
*
* @param count
* the number of items to emit from the end of the sequence emitted by the source
* Observable
- * @return an Observable that emits only the last
count
items emitted by the source
+ * @return an Observable that emits only the last `count` items emitted by the source
* Observable
*/
def takeRight(n: Int): Observable[T] = {
@@ -1167,17 +1205,17 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that emits the items from the source Observable only until the
- * other
Observable emits an item.
- *
*
* @param that
- * the Observable whose first emitted item will cause
takeUntil
to stop
+ * the Observable whose first emitted item will cause `takeUntil` to stop
* emitting items from the source Observable
- * @param other
+ * @tparam E
+ * the type of items emitted by `other`
* @return an Observable that emits the items of the source Observable until such time as
- * other
emits its first item
+ * `other` emits its first item
*/
def takeUntil[E](that: Observable[E]): Observable[T] = {
Observable[T](asJava.takeUntil(that.asJava))
@@ -1186,14 +1224,15 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
- *
- *
onNext
function once, passing it the entire list, by
- * calling the Observable's toList
method prior to calling its {@link #subscribe} method.
- *
*
+ * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
+ * otherwise you'll get a compilation error.
+ *
* @param sequenceOfSequences
* the source Observable that emits Observables
* @return an Observable that emits only the items emitted by the most recently published
* Observable
+ *
+ * @usecase def switch[U]: Observable[U]
+ * @inheritdoc
*/
def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -1243,16 +1288,16 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Flattens two Observables into one Observable, without any transformation.
- *
- *
- *
- *
- *
- *
- *
+ *
+ * $debounceVsThrottle
*
* @param timeout
- * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
+ * The time each value has to be 'the most recent' of the [[Observable]] to ensure that it's not dropped.
*
- * @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
- * @see {@link #debounce}
+ * @return An [[Observable]] which filters out values which are too quickly followed up with newer values.
+ * @see `Observable.debounce`
*/
def throttleWithTimeout(timeout: Duration): Observable[T] = {
Observable[T](asJava.throttleWithTimeout(timeout.length, timeout.unit))
@@ -1369,24 +1420,18 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
- *
- *
- *
+ *
+ * $debounceVsThrottle
*
* @param timeout
- * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
+ * The time each value has to be 'the most recent' of the [[Observable]] to ensure that it's not dropped.
*
- * @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
- * @see {@link #throttleWithTimeout};
+ * @return An [[Observable]] which filters out values which are too quickly followed up with newer values.
+ * @see `Observable.throttleWithTimeout`
*/
def debounce(timeout: Duration): Observable[T] = {
Observable[T](asJava.debounce(timeout.length, timeout.unit))
@@ -1394,25 +1439,19 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
- *
- *
- *
+ *
+ * $debounceVsThrottle
*
* @param timeout
- * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
+ * The time each value has to be 'the most recent' of the [[Observable]] to ensure that it's not dropped.
* @param scheduler
- * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
+ * The [[Scheduler]] to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
- * @see {@link #throttleWithTimeout};
+ * @see `Observable.throttleWithTimeout`
*/
def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = {
Observable[T](asJava.debounce(timeout.length, timeout.unit, scheduler))
@@ -1420,17 +1459,17 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
- *
*
* @param timeout
- * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
+ * The time each value has to be 'the most recent' of the [[Observable]] to ensure that it's not dropped.
* @param scheduler
- * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
+ * The [[Scheduler]] to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
- * @see {@link #debounce}
+ * @see `Observable.debounce`
*/
def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
Observable[T](asJava.throttleWithTimeout(timeout.length, timeout.unit, scheduler))
@@ -1438,15 +1477,15 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
- *
*
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param scheduler
- * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
+ * The [[Scheduler]] to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = {
@@ -1455,9 +1494,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
- *
*
* @param skipDuration
@@ -1470,15 +1509,14 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
- *
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @return Observable which performs the throttle operation.
- * @see {@link #sample(long, TimeUnit)}
*/
def throttleLast(intervalDuration: Duration): Observable[T] = {
Observable[T](asJava.throttleLast(intervalDuration.length, intervalDuration.unit))
@@ -1486,15 +1524,14 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
- *
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @return Observable which performs the throttle operation.
- * @see {@link #sample(long, TimeUnit, Scheduler)}
*/
def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = {
Observable[T](asJava.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
@@ -1503,58 +1540,103 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that sums up the elements of this Observable.
*
+ * This operation is only available if the elements of this Observable are numbers, otherwise
+ * you will get a compilation error.
+ *
* @return an Observable emitting the sum of all the elements of the source Observable
* as its single item.
+ *
+ * @usecase def sum: Observable[T]
+ * @inheritdoc
*/
def sum[U >: T](implicit num: Numeric[U]): Observable[U] = {
- fold(num.zero)(num.plus)
+ foldLeft(num.zero)(num.plus)
}
/**
* Returns an Observable that multiplies up the elements of this Observable.
*
+ * This operation is only available if the elements of this Observable are numbers, otherwise
+ * you will get a compilation error.
+ *
* @return an Observable emitting the product of all the elements of the source Observable
* as its single item.
+ *
+ * @usecase def product: Observable[T]
+ * @inheritdoc
*/
def product[U >: T](implicit num: Numeric[U]): Observable[U] = {
- fold(num.one)(num.times)
+ foldLeft(num.one)(num.times)
}
/**
* Returns an Observable that emits only the very first item emitted by the source Observable, or
* a default value if the source Observable is empty.
- *
*
- * @param defaultValue
+ * @param default
* The default value to emit if the source Observable doesn't emit anything.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return an Observable that emits only the very first item from the source, or a default value
* if the source Observable completes without emitting any item.
*/
def firstOrElse[U >: T](default: => U): Observable[U] = {
- this.take(1).fold[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({
+ this.take(1).foldLeft[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({
case Some(element) => element
case None => default
})
}
+ /**
+ * Returns an Observable that emits only the very first item emitted by the source Observable, or
+ * a default value if the source Observable is empty.
+ *
+ *
+ *
+ * @param default
+ * The default value to emit if the source Observable doesn't emit anything.
+ * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
+ * @return an Observable that emits only the very first item from the source, or a default value
+ * if the source Observable completes without emitting any item.
+ */
+ def headOrElse[U >: T](default: => U): Observable[U] = firstOrElse(default)
+
/**
* Returns an Observable that emits only the very first item emitted by the source Observable.
- * This is just a shorthand for {@code take(1)}.
- *
*
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
*/
- def first: Observable[T] = {
- take(1)
+ def first: Observable[T] = take(1)
+
+ /*
+
+ TODO once https://github.com/Netflix/RxJava/issues/417 is fixed, we can add head and tail methods
+
+ /**
+ * emits NoSuchElementException("head of empty Observable") if empty
+ */
+ def head: Observable[T] = {
+ this.take(1).fold[Option[T]](None)((v: Option[T], e: T) => Some(e)).map({
+ case Some(element) => element
+ case None => throw new NoSuchElementException("head of empty Observable")
+ })
}
-
+
+ /**
+ * emits an UnsupportedOperationException("tail of empty list") if empty
+ */
+ def tail: Observable[T] = ???
+
+ */
+
/**
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
- *
*
* @return an Observable of sequentially distinct items
@@ -1566,7 +1648,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
/**
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially
* distinct according to a key selector function.
- *
*
* @param keySelector
@@ -1578,40 +1660,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
Observable[T](asJava.distinctUntilChanged[U](keySelector))
}
- /**
- * Returns an Observable that forwards all items emitted from the source Observable that are sequentially
- * distinct according to an equality function.
- *
- *
- * @param equality
- * an equality function for deciding whether two emitted items are equal or not
- * @return an Observable of sequentially distinct items
- */
- // def distinctUntilChanged[U](equality: (T, T) => Boolean): Observable[T] = {
- // TODO once https://github.com/Netflix/RxJava/issues/395 is fixed
- // }
-
- /**
- * Returns an Observable that forwards all items emitted from the source Observable that are sequentially
- * distinct according to a key selector function and a comparator.
- *
- *
- * @param keySelector
- * a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
- * distinct from another one or not
- * @param equality
- * an equality function for deciding whether two emitted item keys are equal or not
- * @return an Observable of sequentially distinct items
- */
- // def distinctUntilChanged[U](keySelector: T => U, equality: (T, T) => Boolean): Observable[T] = {
- // TODO once https://github.com/Netflix/RxJava/issues/395 is fixed
- // }
-
/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
- *
*
* @return an Observable of distinct items
@@ -1620,24 +1671,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
Observable[T](asJava.distinct())
}
- /**
- * Returns an Observable that forwards all items emitted from the source Observable that are distinct according
- * to a comparator.
- *
- *
- * @param equality
- * an equality function for deciding whether two emitted items are equal or not
- * @return an Observable of distinct items
- */
- // def distinct(equality: (T, T) => Boolean): Observable[T] = {
- // TODO once https://github.com/Netflix/RxJava/issues/395 is fixed
- // }
-
/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according
* to a key selector function.
- *
*
* @param keySelector
@@ -1649,27 +1686,9 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
Observable[T](asJava.distinct[U](keySelector))
}
- /**
- * Returns an Observable that forwards all items emitted from the source Observable that are distinct according
- * to a key selector function and a comparator.
- *
- *
- * @param keySelector
- * a function that projects an emitted item to a key value which is used for deciding whether an item is
- * distinct from another one or not
- * @param equality
- * an equality function for deciding whether two emitted item keys are equal or not
- * @return an Observable of distinct items
- * @see MSDN: Observable.distinct
- */
- // def distinct[U](keySelector: T => U, equality: (T, T) => Boolean): Observable[T] = {
- // TODO once https://github.com/Netflix/RxJava/issues/395 is fixed
- //}
-
/**
* Returns an Observable that counts the total number of elements in the source Observable.
- *
*
* @return an Observable emitting the number of counted elements of the source Observable
@@ -1678,16 +1697,26 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
def length: Observable[Int] = {
Observable[Integer](asJava.count()).map(_.intValue())
}
+
+ /**
+ * Returns an Observable that counts the total number of elements in the source Observable.
+ *
+ *
+ *
+ * @return an Observable emitting the number of counted elements of the source Observable
+ * as its single item.
+ */
+ def size: Observable[Int] = length
/**
* Retry subscription to origin Observable upto given retry count.
- *
- *
- *
- *
create
so that it behaves as an Observable: It
- * should invoke the Observer's {@link Observer#onNext onNext}, {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods
+ *
+ * Write the function you pass to `create` so that it behaves as an Observable: It
+ * should invoke the Observer's [[Observer.onNext onNext]], [[Observer.onError onError]], and [[Observer.onCompleted onCompleted]] methods
* appropriately.
- * onCompleted
method
- * exactly once or its onError
method exactly once.
- *
*
* @param exception
* the particular error to report
- * @param
*
- *
+ *
+ * Implementation note: the entire range will be immediately emitted each time an [[Observer]] subscribes.
+ * Since this occurs before the [[Subscription]] is returned,
+ * it in not possible to unsubscribe from the sequence before it completes.
+ *
+ * @param range the range
+ * @return an Observable that emits a range of sequential integers
+ */
def apply(range: Range): Observable[Int] = {
Observable[Int](JObservable.from(range.toIterable.asJava))
}
/**
* Returns an Observable that calls an Observable factory to create its Observable for each
- * new Observer that subscribes. That is, for each subscriber, the actuall Observable is determined
+ * new Observer that subscribes. That is, for each subscriber, the actual Observable is determined
* by the factory function.
*
- *
- *
- *
just
method.
- * just()
method
- * converts an Iterable into an Observable that emits the entire Iterable as a single item.
- *
- * @param value
- * the item to pass to the {@link Observer}'s {@link Observer#onNext onNext} method
- * @param
- *
+ *
+ * @param duration
+ * duration between two consecutive numbers
+ * @return An Observable that emits a number each time interval.
+ */
def interval(duration: Duration): Observable[Long] = {
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit))).map(_.longValue())
}
-
+
+ /**
+ * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers.
+ *
+ *
+ *
+ * @param duration
+ * duration between two consecutive numbers
+ * @param scheduler
+ * the scheduler to use
+ * @return An Observable that emits a number each time interval.
+ */
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit, scheduler))).map(_.longValue())
}
@@ -1981,7 +2035,7 @@ object Observable {
// Cannot yet have inner class because of this error message:
// "implementation restriction: nested class is not allowed in value class.
// This restriction is planned to be removed in subsequent releases."
-class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <: T]) {
+private[scala] class WithFilter[+T] (p: T => Boolean, asJava: rx.Observable[_ <: T]) {
import rx.lang.scala.ImplicitFunctionConversions._
def map[B](f: T => B): Observable[B] = {
@@ -1999,11 +2053,11 @@ class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <:
// there is no foreach here, that's only available on BlockingObservable
}
-class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
+private[scala] class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
import scala.concurrent.duration._
import org.junit.{Before, Test, Ignore}
import org.junit.Assert._
- import org.mockito.Matchers.any
+ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{ MockitoAnnotations, Mock }
@@ -2049,7 +2103,7 @@ class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
}
@Test def testFirstOrElse() {
- def mustNotBeCalled: String = error("this method should not be called")
+ def mustNotBeCalled: String = sys.error("this method should not be called")
def mustBeCalled: String = "this is the default value"
assertEquals("hello", Observable("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single)
assertEquals("this is the default value", Observable().firstOrElse(mustBeCalled).toBlockingObservable.single)
@@ -2066,9 +2120,23 @@ class UnitTestSuite extends org.scalatest.junit.JUnitSuite {
assertEquals(receivedMsg, msg)
}
+ /*
+ @Test def testHead() {
+ val observer = mock(classOf[Observer[Int]])
+ val o = Observable().head
+ val sub = o.subscribe(observer)
+
+ verify(observer, never).onNext(any(classOf[Int]))
+ verify(observer, never).onCompleted()
+ verify(observer, times(1)).onError(any(classOf[NoSuchElementException]))
+ }
+ */
+
@Test def testTest() = {
val a: Observable[Int] = Observable()
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)
+ println("This UnitTestSuite.testTest() for rx.lang.scala.Observable")
}
}
+
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala
new file mode 100644
index 0000000000..c717a94af5
--- /dev/null
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala
@@ -0,0 +1,161 @@
+package rx.lang.scala
+
+import java.util.Date
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+import org.junit.Before
+import org.junit.Test
+import org.mockito.Matchers.any
+import org.mockito.Mockito.inOrder
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.never
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
+import org.scalatest.junit.JUnitSuite
+
+import rx.lang.scala.ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0
+import rx.lang.scala.ImplicitFunctionConversions.schedulerActionToFunc2
+import rx.lang.scala.concurrency.TestScheduler
+
+
+/**
+ * Represents an object that schedules units of work.
+ */
+trait Scheduler {
+ def asJava: rx.Scheduler
+
+ /**
+ * Schedules a cancelable action to be executed.
+ *
+ * @param state
+ * State to pass into the action.
+ * @param action
+ * Action to schedule.
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
+ asJava.schedule(state, action)
+ }
+
+ /**
+ * Schedules a cancelable action to be executed in delayTime.
+ *
+ * @param state
+ * State to pass into the action.
+ * @param action
+ * Action to schedule.
+ * @param delayTime
+ * Time the action is to be delayed before executing.
+ * @param unit
+ * Time unit of the delay time.
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = {
+ asJava.schedule(state, action, delayTime.length, delayTime.unit)
+ }
+
+ /**
+ * Schedules a cancelable action to be executed periodically.
+ * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
+ * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
+ *
+ * @param state
+ * State to pass into the action.
+ * @param action
+ * The action to execute periodically.
+ * @param initialDelay
+ * Time to wait before executing the action for the first time.
+ * @param period
+ * The time interval to wait each time in between executing the action.
+ * @return A subscription to be able to unsubscribe from action.
+ */
+ def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = {
+ asJava.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
+ }
+
+ /**
+ * Schedules a cancelable action to be executed at dueTime.
+ *
+ * @param state
+ * State to pass into the action.
+ * @param action
+ * Action to schedule.
+ * @param dueTime
+ * Time the action is to be executed. If in the past it will be executed immediately.
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = {
+ asJava.schedule(state, action, dueTime)
+ }
+
+ /**
+ * Schedules an action to be executed.
+ *
+ * @param action
+ * action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ def schedule(action: () => Unit): Subscription = {
+ asJava.schedule(action)
+ }
+
+ /**
+ * Schedules an action to be executed in delayTime.
+ *
+ * @param action
+ * action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ def schedule(action: () => Unit, delayTime: Duration): Subscription = {
+ asJava.schedule(action, delayTime.length, delayTime.unit)
+ }
+
+ /**
+ * Schedules an action to be executed periodically.
+ *
+ * @param action
+ * The action to execute periodically.
+ * @param initialDelay
+ * Time to wait before executing the action for the first time.
+ * @param period
+ * The time interval to wait each time in between executing the action.
+ * @return A subscription to be able to unsubscribe from action.
+ */
+ def schedulePeriodically(action: () => Unit, initialDelay: Duration, period: Duration): Subscription = {
+ asJava.schedulePeriodically(action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
+ }
+
+ /**
+ * Returns the scheduler's notion of current absolute time in milliseconds.
+ */
+ def now: Long = {
+ asJava.now
+ }
+
+ /**
+ * Parallelism available to a Scheduler.
+ *
+ * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
+ *
+ * @return the scheduler's available degree of parallelism.
+ */
+ def degreeOfParallelism: Int = {
+ asJava.degreeOfParallelism
+ }
+
+}
+
+/**
+ * Provides constructors for Schedulers.
+ */
+object Scheduler {
+ private class WrapJavaScheduler(val asJava: rx.Scheduler) extends Scheduler
+
+ /**
+ * Constructs a Scala Scheduler from a Java Scheduler.
+ */
+ def apply(s: rx.Scheduler): Scheduler = new WrapJavaScheduler(s)
+}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala
new file mode 100644
index 0000000000..8ba88ba2d0
--- /dev/null
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala
@@ -0,0 +1,62 @@
+package rx.lang.scala.concurrency
+
+import java.util.concurrent.Executor
+import java.util.concurrent.ScheduledExecutorService
+import rx.lang.scala.Scheduler
+import rx.lang.scala.ImplicitFunctionConversions._
+
+/**
+ * Factory methods for creating Schedulers.
+ */
+object Schedulers {
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread.
+ */
+ def immediate: Scheduler = rx.concurrency.Schedulers.immediate()
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
+ */
+ def currentThread: Scheduler = rx.concurrency.Schedulers.currentThread()
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
+ */
+ def newThread: Scheduler = rx.concurrency.Schedulers.newThread
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.Executor`.
+ *
+ * Note that this does not support scheduled actions with a delay.
+ */
+ def executor(executor: Executor): Scheduler = rx.concurrency.Schedulers.executor(executor)
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.ScheduledExecutorService`.
+ */
+ def executor(executor: ScheduledExecutorService): Scheduler = rx.concurrency.Schedulers.executor(executor)
+
+ /**
+ * Returns a [[rx.lang.scala.Scheduler]] intended for computational work.
+ *
+ * The implementation is backed by a `java.util.concurrent.ScheduledExecutorService` thread-pool sized to the number of CPU cores.
+ *
+ * This can be used for event-loops, processing callbacks and other computational work.
+ *
+ * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForIO]] instead.
+ */
+ def threadPoolForComputation: Scheduler = rx.concurrency.Schedulers.threadPoolForComputation()
+
+ /**
+ * [[rx.lang.scala.Scheduler]] intended for IO-bound work.
+ *
+ * The implementation is backed by an `java.util.concurrent.Executor` thread-pool that will grow as needed.
+ *
+ * This can be used for asynchronously performing blocking IO.
+ *
+ * Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation]] instead.
+ */
+ def threadPoolForIO: Scheduler = rx.concurrency.Schedulers.threadPoolForIO()
+
+}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala
new file mode 100644
index 0000000000..7023ca2f4e
--- /dev/null
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala
@@ -0,0 +1,105 @@
+package rx.lang.scala.concurrency
+
+import scala.concurrent.duration.Duration
+import rx.lang.scala.Scheduler
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Scheduler with artificial time, useful for testing.
+ *
+ * For example, you could test the `Observable.interval` operation using a `TestScheduler` as follows:
+ *
+ * {{{
+ * @Test def testInterval() {
+ * import org.mockito.Matchers._
+ * import org.mockito.Mockito._
+ *
+ * val scheduler = TestScheduler()
+ * val observer = mock(classOf[rx.Observer[Long]])
+ *
+ * val o = Observable.interval(1 second, scheduler)
+ * val sub = o.subscribe(observer)
+ *
+ * verify(observer, never).onNext(0L)
+ * verify(observer, never).onCompleted()
+ * verify(observer, never).onError(any(classOf[Throwable]))
+ *
+ * scheduler.advanceTimeTo(2 seconds)
+ *
+ * val inOrdr = inOrder(observer);
+ * inOrdr.verify(observer, times(1)).onNext(0L)
+ * inOrdr.verify(observer, times(1)).onNext(1L)
+ * inOrdr.verify(observer, never).onNext(2L)
+ * verify(observer, never).onCompleted()
+ * verify(observer, never).onError(any(classOf[Throwable]))
+ *
+ * sub.unsubscribe();
+ * scheduler.advanceTimeTo(4 seconds)
+ * verify(observer, never).onNext(2L)
+ * verify(observer, times(1)).onCompleted()
+ * verify(observer, never).onError(any(classOf[Throwable]))
+ * }
+ * }}}
+ */
+class TestScheduler extends Scheduler {
+ val asJava = new rx.concurrency.TestScheduler
+
+ def advanceTimeBy(time: Duration) {
+ asJava.advanceTimeBy(time.length, time.unit)
+ }
+
+ def advanceTimeTo(time: Duration) {
+ asJava.advanceTimeTo(time.length, time.unit)
+ }
+
+ def triggerActions() {
+ asJava.triggerActions()
+ }
+}
+
+/**
+ * Provides constructors for `TestScheduler`.
+ */
+object TestScheduler {
+ def apply(): TestScheduler = {
+ new TestScheduler
+ }
+}
+
+private class UnitTest extends JUnitSuite {
+ import org.junit.Test
+ import scala.concurrent.duration._
+ import scala.language.postfixOps
+ import rx.lang.scala.{Observable, Observer}
+
+ @Test def testInterval() {
+ import org.mockito.Matchers._
+ import org.mockito.Mockito._
+
+ val scheduler = TestScheduler()
+ val observer = mock(classOf[rx.Observer[Long]])
+
+ val o = Observable.interval(1 second, scheduler)
+ val sub = o.subscribe(observer)
+
+ verify(observer, never).onNext(0L)
+ verify(observer, never).onCompleted()
+ verify(observer, never).onError(any(classOf[Throwable]))
+
+ scheduler.advanceTimeTo(2 seconds)
+
+ val inOrdr = inOrder(observer);
+ inOrdr.verify(observer, times(1)).onNext(0L)
+ inOrdr.verify(observer, times(1)).onNext(1L)
+ inOrdr.verify(observer, never).onNext(2L)
+ verify(observer, never).onCompleted()
+ verify(observer, never).onError(any(classOf[Throwable]))
+
+ sub.unsubscribe();
+ scheduler.advanceTimeTo(4 seconds)
+ verify(observer, never).onNext(2L)
+ verify(observer, times(1)).onCompleted()
+ verify(observer, never).onError(any(classOf[Throwable]))
+ }
+}
+
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala
index 9dabd3356b..a3e61c0021 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala
@@ -17,18 +17,15 @@ package rx.lang.scala
import rx.concurrency.CurrentThreadScheduler
-package object concurrency {
- /*
- TODO
- rx.concurrency.CurrentThreadScheduler
- rx.concurrency.ExecutorScheduler
- rx.concurrency.ImmediateScheduler
- rx.concurrency.NewThreadScheduler
- rx.concurrency.Schedulers
- rx.concurrency.TestScheduler
+/**
+ * Provides schedulers.
*/
-
- lazy val CurrentThreadScheduler = rx.concurrency.CurrentThreadScheduler.getInstance()
- lazy val NewThreadScheduler = rx.concurrency.NewThreadScheduler.getInstance()
+package object concurrency {
-}
\ No newline at end of file
+ // These classes are not exposed to Scala users, but are accessible through rx.lang.scala.concurrency.Schedulers:
+
+ // rx.concurrency.CurrentThreadScheduler
+ // rx.concurrency.ExecutorScheduler
+ // rx.concurrency.ImmediateScheduler
+ // rx.concurrency.NewThreadScheduler
+}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
index 5470f6f1cb..8d9323ba32 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
@@ -18,19 +18,25 @@ package rx.lang.scala.observables
import scala.collection.JavaConverters._
import rx.lang.scala.ImplicitFunctionConversions._
-class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <: T])
+/**
+ * An Observable that provides blocking operators.
+ *
+ * You can obtain a BlockingObservable from an Observable using [[Observable.toBlockingObservable]]
+ */
+// constructor is private because users should use Observable.toBlockingObservable
+class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
extends AnyVal
{
/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
* completes.
- *
*
* @param onNext
@@ -41,6 +47,10 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <:
def foreach(f: T => Unit): Unit = {
asJava.forEach(f);
}
+
+ def withFilter(p: T => Boolean): WithFilter[T] = {
+ new WithFilter[T](p, asJava)
+ }
// last -> use toIterable.last
// lastOrDefault -> use toIterable.lastOption
@@ -118,3 +128,23 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <:
}
}
+
+// Cannot yet have inner class because of this error message:
+// "implementation restriction: nested class is not allowed in value class.
+// This restriction is planned to be removed in subsequent releases."
+private[observables] class WithFilter[+T] (p: T => Boolean, asJava: rx.observables.BlockingObservable[_ <: T]) {
+ import rx.lang.scala.ImplicitFunctionConversions._
+
+ // there's no map and flatMap here, they're only available on Observable
+
+ def withFilter(q: T => Boolean) = new WithFilter[T]((x: T) => p(x) && q(x), asJava)
+
+ def foreach(f: T => Unit): Unit = {
+ asJava.forEach((e: T) => {
+ if (p(e)) f(e)
+ })
+ }
+
+}
+
+
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala
new file mode 100644
index 0000000000..8507f0a54c
--- /dev/null
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala
@@ -0,0 +1,12 @@
+package rx.lang.scala
+
+/**
+ * Contains special Observables.
+ *
+ * In Scala, this package only contains [[BlockingObservable]].
+ * In the corresponding Java package `rx.observables`, there is also a
+ * `GroupedObservable` and a `ConnectableObservable`, but these are not needed
+ * in Scala, because we use a pair `(key, observable)` instead of `GroupedObservable`
+ * and a pair `(startFunction, observable)` instead of `ConnectableObservable`.
+ */
+package object observables {}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala
index 0f6ea79d34..8aa0e63760 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala
@@ -15,39 +15,137 @@
*/
package rx.lang
+import java.util.concurrent.TimeUnit
+import java.util.Date
-/*
- * This object contains aliases to all types Scala users need to import.
+/*
* Note that:
- * - Scala users cannot use Java's type with variance without always using writing
+ * - Scala users cannot use Java's types with variance without always using writing
* e.g. rx.Notification[_ <: T], so we create aliases fixing the variance
- * - For consistency, we create aliases for all types
- * - Type aliases cannot be at top level, they have to be inside an object or class
+ * - For consistency, we create aliases for all types which Scala users need
+ */
+
+/**
+ * This package contains all classes that RxScala users need.
+ *
+ * It mirrors the structure of package `rx`, but implementation classes that RxScala users
+ * will not need are left out.
*/
package object scala {
- type Notification[+T] = rx.Notification[_ <: T]
- object Notification {
- def apply[T](): Notification[T] = new rx.Notification()
- def apply[T](value: T): Notification[T] = new rx.Notification(value)
- def apply[T](t: Throwable): Notification[T] = new rx.Notification(t)
+ /*
+ * Here we're imitating C's preprocessor using Search & Replace.
+ *
+ * To activate the code needed to get nice Scaladoc, do the following replacements:
+ * /*//#ifdef SCALADOC --> //#ifdef SCALADOC
+ * *///#else --> /*//#else
+ * //#endif --> *///#endif
+ *
+ * To get back to the actual code, undo the above replacements.
+ *
+ */
+
+ /*//#ifdef SCALADOC
+
+ /**
+ * Provides a mechanism for receiving push-based notifications.
+ *
+ * After an Observer calls an [[Observable]]'s `subscribe` method, the Observable
+ * calls the Observer's `onNext` method to provide notifications. A well-behaved Observable will
+ * call an Observer's `onCompleted` method exactly once or the Observer's `onError` method exactly once.
+ */
+ trait Observer[-T] {
+
+ /**
+ * Notifies the Observer that the [[Observable]] has finished sending push-based notifications.
+ *
+ * The [[Observable]] will not call this method if it calls `onError`.
+ */
+ def onCompleted(): Unit
+
+ /**
+ * Notifies the Observer that the [[Observable]] has experienced an error condition.
+ *
+ * If the [[Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
+ */
+ def onError(e: Throwable): Unit
+
+ /**
+ * Provides the Observer with new data.
+ *
+ * The [[Observable]] calls this closure 0 or more times.
+ *
+ * The [[Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
+ */
+ def onNext(arg: T): Unit
+ }
+
+ /**
+ * Subscriptions are returned from all `Observable.subscribe` methods to allow unsubscribing.
+ *
+ * This interface is the equivalent of `IDisposable` in the .NET Rx implementation.
+ */
+ trait Subscription {
+ /**
+ * Call this method to stop receiving notifications on the Observer that was registered when
+ * this Subscription was received.
+ */
+ def unsubscribe(): Unit
}
- type Observer[-T] = rx.Observer[_ >: T]
- type Scheduler = rx.Scheduler
- type Subscription = rx.Subscription
+ import language.implicitConversions
+
+ private[scala] implicit def fakeSubscription2RxSubscription(s: Subscription): rx.Subscription =
+ new rx.Subscription {
+ def unsubscribe() = s.unsubscribe()
+ }
+ private[scala] implicit def rxSubscription2FakeSubscription(s: rx.Subscription): Subscription =
+ new Subscription {
+ def unsubscribe() = s.unsubscribe()
+ }
+
+ private[scala] implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
+ new rx.util.functions.Func2[rx.Scheduler, T, rx.Subscription] {
+ def call(s: rx.Scheduler, t: T): rx.Subscription = {
+ action(ImplicitFunctionConversions.javaSchedulerToScalaScheduler(s), t)
+ }
+ }
+ private[scala] implicit def fakeObserver2RxObserver[T](o: Observer[T]): rx.Observer[_ >: T] = ???
+ private[scala] implicit def rxObserver2fakeObserver[T](o: rx.Observer[_ >: T]): Observer[T] = ???
+
+ *///#else
+
+ type Observer[-T] = rx.Observer[_ >: T]
+
+ type Subscription = rx.Subscription
+
+ //#endif
+
+ /**
+ * Allows to construct observables in a similar way as futures.
+ *
+ * Example:
+ *
+ * {{{
+ * implicit val scheduler = Schedulers.threadPoolForIO
+ * val o: Observable[List[Friend]] = observable {
+ * session.getFriends
+ * }
+ * o.subscribe(
+ * friendList => println(friendList),
+ * err => println(err.getMessage)
+ * )
+ * }}}
+ */
+ def observable[T](body: => T)(implicit scheduler: Scheduler): Observable[T] = {
+ Observable(1).observeOn(scheduler).map(_ => body)
+ }
}
/*
-TODO make aliases for these types because:
-* those which are covariant or contravariant do need an alias to get variance correct
-* the others for consistency
-
-rx.observables.BlockingObservable
-rx.observables.ConnectableObservable
-rx.observables.GroupedObservable
+These classes are considered unnecessary for Scala users, so we don't create aliases for them:
rx.plugins.RxJavaErrorHandler
rx.plugins.RxJavaObservableExecutionHook
@@ -58,4 +156,3 @@ rx.subscriptions.CompositeSubscription
rx.subscriptions.Subscriptions
*/
-
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala
index 8f99d02bf6..ec096e92eb 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala
@@ -1,11 +1,28 @@
package rx.lang.scala
+/**
+ * Provides the type `Subject`.
+ */
package object subjects {
- // in Java: public abstract class Subject
" + name + "(...)
"
} else {
diff --git a/settings.gradle b/settings.gradle
index 8750fab727..32cef40c9b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -3,7 +3,6 @@ include 'rxjava-core', \
'language-adaptors:rxjava-groovy', \
'language-adaptors:rxjava-clojure', \
'language-adaptors:rxjava-scala', \
-'language-adaptors:rxjava-scala-java', \
'rxjava-contrib:rxjava-swing', \
'rxjava-contrib:rxjava-android', \
'rxjava-contrib:rxjava-apache-http'