forked from ReactiveX/RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMathObservable.java
More file actions
382 lines (356 loc) · 21.3 KB
/
MathObservable.java
File metadata and controls
382 lines (356 loc) · 21.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;
import java.util.Comparator;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Functions;
import rx.math.operators.OperatorMinMax;
import rx.math.operators.OperatorSum;
import rx.math.operators.OperatorAverageDouble;
import rx.math.operators.OperatorAverageFloat;
import rx.math.operators.OperatorAverageInteger;
import rx.math.operators.OperatorAverageLong;
public class MathObservable<T> {
private final Observable<T> o;
private MathObservable(Observable<T> o) {
this.o = o;
}
public static <T> MathObservable<T> from(Observable<T> o) {
return new MathObservable<T>(o);
}
/**
* Returns an Observable that emits the average of the Doubles emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.png">
*
* @param source
* source Observable to compute the average of
* @return an Observable that emits a single item: the average of all the Doubles emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageDouble()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final static Observable<Double> averageDouble(Observable<Double> source) {
return source.lift(new OperatorAverageDouble<Double>(Functions.<Double>identity()));
}
/**
* Returns an Observable that emits the average of the Floats emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.png">
*
* @param source
* source Observable to compute the average of
* @return an Observable that emits a single item: the average of all the Floats emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageFloat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final static Observable<Float> averageFloat(Observable<Float> source) {
return source.lift(new OperatorAverageFloat<Float>(Functions.<Float>identity()));
}
/**
* Returns an Observable that emits the average of the Integers emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.png">
*
* @param source
* source Observable to compute the average of
* @return an Observable that emits a single item: the average of all the Integers emitted by the source
* Observable
* @throws IllegalArgumentException
* if the source Observable emits no items
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageInteger()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final static Observable<Integer> averageInteger(Observable<Integer> source) {
return source.lift(new OperatorAverageInteger<Integer>(Functions.<Integer>identity()));
}
/**
* Returns an Observable that emits the average of the Longs emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.png">
*
* @param source
* source Observable to compute the average of
* @return an Observable that emits a single item: the average of all the Longs emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageLong()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final static Observable<Long> averageLong(Observable<Long> source) {
return source.lift(new OperatorAverageLong<Long>(Functions.<Long>identity()));
}
/**
* Returns an Observable that emits the single item emitted by the source Observable with the maximum
* numeric value. If there is more than one item with the same maximum value, it emits the last-emitted of
* these.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/max.png">
*
* @param source
* an Observable to scan for the maximum emitted item
* @return an Observable that emits this maximum item
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-max">RxJava Wiki: max()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211837.aspx">MSDN: Observable.Max</a>
*/
public final static <T extends Comparable<? super T>> Observable<T> max(Observable<T> source) {
return OperatorMinMax.max(source);
}
/**
* Returns an Observable that emits the single numerically minimum item emitted by the source Observable.
* If there is more than one such item, it returns the last-emitted one.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/min.png">
*
* @param source
* an Observable to determine the minimum item of
* @return an Observable that emits the minimum item emitted by the source Observable
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229715.aspx">MSDN: Observable.Min</a>
*/
public final static <T extends Comparable<? super T>> Observable<T> min(Observable<T> source) {
return OperatorMinMax.min(source);
}
/**
* Returns an Observable that emits the sum of all the Doubles emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.png">
*
* @param source
* the source Observable to compute the sum of
* @return an Observable that emits a single item: the sum of all the Doubles emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumDouble()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final static Observable<Double> sumDouble(Observable<Double> source) {
return OperatorSum.sumDoubles(source);
}
/**
* Returns an Observable that emits the sum of all the Floats emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.png">
*
* @param source
* the source Observable to compute the sum of
* @return an Observable that emits a single item: the sum of all the Floats emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumFloat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final static Observable<Float> sumFloat(Observable<Float> source) {
return OperatorSum.sumFloats(source);
}
/**
* Returns an Observable that emits the sum of all the Integers emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.png">
*
* @param source
* source Observable to compute the sum of
* @return an Observable that emits a single item: the sum of all the Integers emitted by the source
* Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumInteger()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final static Observable<Integer> sumInteger(Observable<Integer> source) {
return OperatorSum.sumIntegers(source);
}
/**
* Returns an Observable that emits the sum of all the Longs emitted by the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.png">
*
* @param source
* source Observable to compute the sum of
* @return an Observable that emits a single item: the sum of all the Longs emitted by the
* source Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumLong()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final static Observable<Long> sumLong(Observable<Long> source) {
return OperatorSum.sumLongs(source);
}
/**
* Returns an Observable that transforms items emitted by the source Observable into Doubles by using a
* function you provide and then emits the Double average of the complete sequence of transformed values.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.f.png">
*
* @param valueExtractor
* the function to transform an item emitted by the source Observable into a Double
* @return an Observable that emits a single item: the Double average of the complete sequence of items
* emitted by the source Observable when transformed into Doubles by the specified function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageDouble()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
return o.lift(new OperatorAverageDouble<T>(valueExtractor));
}
/**
* Returns an Observable that transforms items emitted by the source Observable into Floats by using a
* function you provide and then emits the Float average of the complete sequence of transformed values.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.f.png">
*
* @param valueExtractor
* the function to transform an item emitted by the source Observable into a Float
* @return an Observable that emits a single item: the Float average of the complete sequence of items
* emitted by the source Observable when transformed into Floats by the specified function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageFloat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
return o.lift(new OperatorAverageFloat<T>(valueExtractor));
}
/**
* Returns an Observable that transforms items emitted by the source Observable into Integers by using a
* function you provide and then emits the Integer average of the complete sequence of transformed values.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.f.png">
*
* @param valueExtractor
* the function to transform an item emitted by the source Observable into an Integer
* @return an Observable that emits a single item: the Integer average of the complete sequence of items
* emitted by the source Observable when transformed into Integers by the specified function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageInteger()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
return o.lift(new OperatorAverageInteger<T>(valueExtractor));
}
/**
* Returns an Observable that transforms items emitted by the source Observable into Longs by using a
* function you provide and then emits the Long average of the complete sequence of transformed values.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/average.f.png">
*
* @param valueExtractor
* the function to transform an item emitted by the source Observable into a Long
* @return an Observable that emits a single item: the Long average of the complete sequence of items
* emitted by the source Observable when transformed into Longs by the specified function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-averageinteger-averagelong-averagefloat-and-averagedouble">RxJava Wiki: averageLong()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average.aspx">MSDN: Observable.Average</a>
*/
public final Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
return o.lift(new OperatorAverageLong<T>(valueExtractor));
}
/**
* Returns an Observable that emits the maximum item emitted by the source Observable, according to the
* specified comparator. If there is more than one item with the same maximum value, it emits the
* last-emitted of these.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/max.png">
*
* @param comparator
* the comparer used to compare items
* @return an Observable that emits the maximum item emitted by the source Observable, according to the
* specified comparator
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-max">RxJava Wiki: max()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211635.aspx">MSDN: Observable.Max</a>
*/
public final Observable<T> max(Comparator<? super T> comparator) {
return OperatorMinMax.max(o, comparator);
}
/**
* Returns an Observable that emits the minimum item emitted by the source Observable, according to a
* specified comparator. If there is more than one such item, it returns the last-emitted one.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/min.png">
*
* @param comparator
* the comparer used to compare elements
* @return an Observable that emits the minimum item emitted by the source Observable according to the
* specified comparator
* @throws IllegalArgumentException
* if the source is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-min">RxJava Wiki: min()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229095.aspx">MSDN: Observable.Min</a>
*/
public final Observable<T> min(Comparator<? super T> comparator) {
return OperatorMinMax.min(o, comparator);
}
/**
* Returns an Observable that extracts a Double from each of the items emitted by the source Observable via
* a function you specify, and then emits the sum of these Doubles.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.f.png">
*
* @param valueExtractor
* the function to extract a Double from each item emitted by the source Observable
* @return an Observable that emits the Double sum of the Double values corresponding to the items emitted
* by the source Observable as transformed by the provided function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumDouble()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
return OperatorSum.sumAtLeastOneDoubles(o.map(valueExtractor));
}
/**
* Returns an Observable that extracts a Float from each of the items emitted by the source Observable via
* a function you specify, and then emits the sum of these Floats.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.f.png">
*
* @param valueExtractor
* the function to extract a Float from each item emitted by the source Observable
* @return an Observable that emits the Float sum of the Float values corresponding to the items emitted by
* the source Observable as transformed by the provided function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumFloat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
return OperatorSum.sumAtLeastOneFloats(o.map(valueExtractor));
}
/**
* Returns an Observable that extracts an Integer from each of the items emitted by the source Observable
* via a function you specify, and then emits the sum of these Integers.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.f.png">
*
* @param valueExtractor
* the function to extract an Integer from each item emitted by the source Observable
* @return an Observable that emits the Integer sum of the Integer values corresponding to the items emitted
* by the source Observable as transformed by the provided function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumInteger()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
return OperatorSum.sumAtLeastOneIntegers(o.map(valueExtractor));
}
/**
* Returns an Observable that extracts a Long from each of the items emitted by the source Observable via a
* function you specify, and then emits the sum of these Longs.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sum.f.png">
*
* @param valueExtractor
* the function to extract a Long from each item emitted by the source Observable
* @return an Observable that emits the Long sum of the Long values corresponding to the items emitted by
* the source Observable as transformed by the provided function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-suminteger-sumlong-sumfloat-and-sumdouble">RxJava Wiki: sumLong()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
return OperatorSum.sumAtLeastOneLongs(o.map(valueExtractor));
}
}