-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTestBiSubscriber.java
More file actions
257 lines (231 loc) · 8.32 KB
/
TestBiSubscriber.java
File metadata and controls
257 lines (231 loc) · 8.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package rx.observers;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import rx.BiObserver;
import rx.BiSubscriber;
import rx.Notification;
import rx.Subscriber;
public class TestBiSubscriber<T0, T1> extends BiSubscriber<T0, T1> {
private final TestBiObserver<T0, T1> testObserver;
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Thread lastSeenThread;
public TestBiSubscriber(BiSubscriber<T0, T1> delegate) {
this.testObserver = new TestBiObserver<T0, T1>(delegate);
}
public TestBiSubscriber(BiObserver<T0, T1> delegate) {
this.testObserver = new TestBiObserver<T0, T1>(delegate);
}
public TestBiSubscriber() {
this.testObserver = new TestBiObserver<T0, T1>(new BiObserver<T0, T1>() {
@Override
public void onComplete() {
// do nothing
}
@Override
public void onError(Throwable e) {
// do nothing
}
@Override
public void onNext(T0 t0, T1 t1) {
// do nothing
}
});
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based
* notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@Override
public void onComplete() {
try {
lastSeenThread = Thread.currentThread();
testObserver.onComplete();
} finally {
latch.countDown();
}
}
/**
* Get the {@link Notification}s representing each time this {@link Subscriber} was notified of
* sequence completion via {@link #onCompleted}, as a {@link List}.
*
* @return a list of Notifications representing calls to this Subscriber's {@link #onCompleted}
* method
*/
public List<BiNotification<T0, T1>> getOnCompletedEvents() {
return testObserver.getOnCompletedEvents();
}
/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@Override
public void onError(Throwable e) {
try {
lastSeenThread = Thread.currentThread();
testObserver.onError(e);
} finally {
latch.countDown();
}
}
/**
* Get the {@link Throwable}s this {@link Subscriber} was notified of via {@link #onError} as a
* {@link List}.
*
* @return a list of the Throwables that were passed to this Subscriber's {@link #onError}
* method
*/
public List<Throwable> getOnErrorEvents() {
return testObserver.getOnErrorEvents();
}
/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either
* {@link #onCompleted} or {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
@Override
public void onNext(T0 t0, T1 t1) {
lastSeenThread = Thread.currentThread();
testObserver.onNext(t0, t1);
}
/**
* Allow calling the protected {@link #request(long)} from unit tests.
*
* @param n
* the maximum number of items you want the Observable to emit to the Subscriber at
* this time, or {@code Long.MAX_VALUE} if you want the Observable to emit items at
* its own pace
*/
public void requestMore(long n) {
request(n);
}
/**
* Get the sequence of items observed by this {@link Subscriber}, as an ordered {@link List}.
*
* @return a list of items observed by this Subscriber, in the order in which they were observed
*/
public List<TestEvent<T0, T1>> getOnNextEvents() {
return testObserver.getOnNextEvents();
}
/**
* Assert that a particular sequence of items was received by this {@link Subscriber} in order.
*
* @param items
* the sequence of items expected to have been observed
* @throws AssertionError
* if the sequence of items observed does not exactly match {@code items}
*/
public void assertReceivedOnNext(List<TestEvent<T0, T1>> items) {
testObserver.assertReceivedOnNext(items);
}
/**
* Assert that a single terminal event occurred, either {@link #onCompleted} or {@link #onError}
* .
*
* @throws AssertionError
* if not exactly one terminal event notification was received
*/
public void assertTerminalEvent() {
testObserver.assertTerminalEvent();
}
/**
* Assert that this {@code Subscriber} is unsubscribed.
*
* @throws AssertionError
* if this {@code Subscriber} is not unsubscribed
*/
public void assertUnsubscribed() {
if (!isUnsubscribed()) {
throw new AssertionError("Not unsubscribed.");
}
}
/**
* Assert that this {@code Subscriber} has received no {@code onError} notifications.
*
* @throws AssertionError
* if this {@code Subscriber} has received one or more {@code onError} notifications
*/
public void assertNoErrors() {
if (getOnErrorEvents().size() > 0) {
// can't use AssertionError because (message, cause) doesn't exist until Java 7
throw new RuntimeException("Unexpected onError events: " + getOnErrorEvents()
.size(), getOnErrorEvents().get(0));
// TODO possibly check for Java7+ and then use AssertionError at runtime (since we
// always compile with 7)
}
}
/**
* Blocks until this {@link Subscriber} receives a notification that the {@code Observable} is
* complete (either an {@code onCompleted} or {@code onError} notification).
*
* @throws RuntimeException
* if the Subscriber is interrupted before the Observable is able to complete
*/
public void awaitTerminalEvent() {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
/**
* Blocks until this {@link Subscriber} receives a notification that the {@code Observable} is
* complete (either an {@code onCompleted} or {@code onError} notification), or until a timeout
* expires.
*
* @param timeout
* the duration of the timeout
* @param unit
* the units in which {@code timeout} is expressed
* @throws RuntimeException
* if the Subscriber is interrupted before the Observable is able to complete
*/
public void awaitTerminalEvent(long timeout, TimeUnit unit) {
try {
latch.await(timeout, unit);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
/**
* Blocks until this {@link Subscriber} receives a notification that the {@code Observable} is
* complete (either an {@code onCompleted} or {@code onError} notification), or until a timeout
* expires; if the Subscriber is interrupted before either of these events take place, this
* method unsubscribes the Subscriber from the Observable).
*
* @param timeout
* the duration of the timeout
* @param unit
* the units in which {@code timeout} is expressed
*/
public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit unit) {
try {
awaitTerminalEvent(timeout, unit);
} catch (RuntimeException e) {
unsubscribe();
}
}
/**
* Returns the last thread that was in use when an item or notification was received by this
* {@link Subscriber}.
*
* @return the {@code Thread} on which this Subscriber last received an item or notification
* from the Observable it is subscribed to
*/
public Thread getLastSeenThread() {
return lastSeenThread;
}
}