Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4127,7 +4127,7 @@ public final Observable<T> delay(long delay, TimeUnit unit) {
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorDelay<T>(this, delay, unit, scheduler));
return lift(new OperatorDelay<T>(delay, unit, scheduler));
}

/**
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.annotations.Experimental;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
Expand All @@ -32,6 +33,7 @@
import rx.functions.Func8;
import rx.functions.Func9;
import rx.internal.operators.OnSubscribeToObservableFuture;
import rx.internal.operators.OperatorDelay;
import rx.internal.operators.OperatorDoOnEach;
import rx.internal.operators.OperatorMap;
import rx.internal.operators.OperatorObserveOn;
Expand Down Expand Up @@ -1898,4 +1900,50 @@ public void onNext(T t) {

return lift(new OperatorDoOnEach<T>(observer));
}

/**
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
* specified delay. Error notifications from the source Single are not delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param delay
* the delay to shift the source by
* @param unit
* the time unit of {@code delay}
* @param scheduler
* the {@link Scheduler} to use for delaying
* @return the source Single shifted in time by the specified delay
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@Experimental
public final Single<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorDelay<T>(delay, unit, scheduler));
}

/**
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
* specified delay. Error notifications from the source Observable are not delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay
* the delay to shift the source by
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @return the source Single shifted in time by the specified delay
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@Experimental
public final Single<T> delay(long delay, TimeUnit unit) {
return delay(delay, unit, Schedulers.computation());
}
}
4 changes: 1 addition & 3 deletions src/main/java/rx/internal/operators/OperatorDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
*/
public final class OperatorDelay<T> implements Operator<T, T> {

final Observable<? extends T> source;
final long delay;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorDelay(Observable<? extends T> source, long delay, TimeUnit unit, Scheduler scheduler) {
this.source = source;
public OperatorDelay(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
Expand Down
45 changes: 43 additions & 2 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.TestScheduler;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;


public class SingleTest {

@Test
Expand Down Expand Up @@ -436,7 +438,7 @@ public void call() {
fail("timed out waiting for latch");
}
}

@Test
public void testBackpressureAsObservable() {
Single<String> s = Single.create(new OnSubscribe<String>() {
Expand All @@ -462,7 +464,7 @@ public void onStart() {

ts.assertValue("hello");
}

@Test
public void testToObservable() {
Observable<String> a = Single.just("a").toObservable();
Expand Down Expand Up @@ -648,4 +650,43 @@ public void doOnSuccessShouldNotSwallowExceptionThrownByAction() {

verify(action).call(eq("value"));
}

@Test
public void delayWithSchedulerShouldDelayCompletion() {
TestScheduler scheduler = new TestScheduler();
Single<Integer> single = Single.just(1).delay(100, TimeUnit.DAYS, scheduler);

TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
single.subscribe(subscriber);

subscriber.assertNotCompleted();
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
subscriber.assertCompleted();
subscriber.assertValue(1);
}

@Test
public void delayWithSchedulerShouldShortCutWithFailure() {
TestScheduler scheduler = new TestScheduler();
final RuntimeException expected = new RuntimeException();
Single<Integer> single = Single.create(new OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
singleSubscriber.onSuccess(1);
singleSubscriber.onError(expected);
}
}).delay(100, TimeUnit.DAYS, scheduler);

TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
single.subscribe(subscriber);

subscriber.assertNotCompleted();
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
subscriber.assertNoValues();
subscriber.assertError(expected);
}
}