Skip to content

2.x: different behavior of throttleLast() (compared to 1.x) #5516

Closed
@wychi

Description

@wychi

Hello there,

I am not sure if there is a bug or not, but I got different result when using throttleLast()

Here is my testing result. As you can see, there is a difference in handling onNext(5).
5 is emitted in 1.x but lost in 2.x

rxjava 1.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
wait before terminate
progress 5
doOnComplete
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end
rxjava 2.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
doOnComplete
wait before terminate
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end

Here is my testing code

    compile "io.reactivex.rxjava2:rxjava:2.1.1"
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex:rxjava:1.3.0'
package allstar.wychi.allstardemo;


import org.junit.Test;

import java.util.concurrent.TimeUnit;

import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

import static org.junit.Assert.assertEquals;

public class ExampleUnitTest {

    @Test
    public void test1x() {
        final rx.subjects.PublishSubject<Integer> subject = rx.subjects.PublishSubject.create();

        System.out.println("rxjava 1.x throttleLast()");
        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(rx.schedulers.Schedulers.io())
                .doOnNext(new rx.functions.Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new rx.functions.Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnCompleted(new rx.functions.Action0() {
                    @Override
                    public void call() {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onCompleted();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test2x() {
        System.out.println("rxjava 2.x throttleLast()");

        final PublishSubject<Integer> subject = PublishSubject.create();

        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onComplete();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Just want to know which behavior is expected.
Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions