public class Streams extends Object
Stream
, Streams provide for common transformations from a few structures such as
Iterable or Future to a Stream, in addition to provide for combinatory operations (merge, switchOnNext...).
Examples of use (In Java8 but would also work with Anonymous classes or Groovy Closures for instance):
Streams.just(1, 2, 3).map(i -> i*2) //...
Broadcaster<String> stream = Streams.broadcast()
strean.map(i -> i*2).consume(System.out::println);
stream.onNext("hello");
Stream.create( subscriber -> {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onComplete();
}).consume(System.out::println);
Broadcaster<Integer> inputStream1 = Broadcaster.create(env);
Broadcaster<Integer> inputStream2 = Broadcaster.create(env);
Streams.merge(environment, inputStream1, inputStream2).map(i -> i*2).consume(System.out::println);
Modifier | Constructor and Description |
---|---|
protected |
Streams() |
Modifier and Type | Method and Description |
---|---|
static void |
await(org.reactivestreams.Publisher<?> publisher)
Wait 30 Seconds until a terminal signal from the passed publisher has been emitted.
|
static void |
await(org.reactivestreams.Publisher<?> publisher,
long timeout)
Wait {code timeout} Seconds until a terminal signal from the passed publisher has been emitted.
|
static void |
await(org.reactivestreams.Publisher<?> publisher,
long timeout,
TimeUnit unit)
Wait {code timeout} in
unit until a terminal signal from the passed publisher has been emitted. |
static void |
await(org.reactivestreams.Publisher<?> publisher,
long timeout,
TimeUnit unit,
boolean request)
Wait {code timeout} in
unit until a terminal signal from the passed publisher has been emitted. |
static <TUPLE extends Tuple,V> |
combineLatest(List<? extends org.reactivestreams.Publisher<?>> sources,
Function<TUPLE,? extends V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <E,TUPLE extends Tuple,V> |
combineLatest(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> sources,
Function<TUPLE,? extends V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,V> Stream<V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
Function<Tuple2<T1,T2>,? extends V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
Function<Tuple3<T1,T2,T3>,? extends V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,T4,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
Function<Tuple4<T1,T2,T3,T4>,V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
Function<Tuple5<T1,T2,T3,T4,T5>,V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
Function<Tuple6<T1,T2,T3,T4,T5,T6>,V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,T4,T5,T6,T7,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
org.reactivestreams.Publisher<? extends T7> source7,
Function<Tuple7<T1,T2,T3,T4,T5,T6,T7>,V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T1,T2,T3,T4,T5,T6,T7,T8,V> |
combineLatest(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
org.reactivestreams.Publisher<? extends T7> source7,
org.reactivestreams.Publisher<? extends T8> source8,
Function<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>,? extends V> combinator)
Build a Stream whose data are generated by the combination of the most recent published values from
all publishers.
|
static <T> Stream<T> |
concat(Iterable<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> concatdPublishers)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
concat(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7,
org.reactivestreams.Publisher<? extends T> source8)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
create(org.reactivestreams.Publisher<T> publisher)
Build a custom sequence Stream from the passed
Publisher that will be
subscribed on the
first
request from the new subscriber. |
static <T,C> Stream<T> |
createWith(BiConsumer<Long,SubscriberWithContext<T,C>> requestConsumer,
Function<org.reactivestreams.Subscriber<? super T>,C> contextFactory)
Create a
Stream reacting on requests with the passed BiConsumer
The argument contextFactory is executed once by new subscriber to generate a context shared by every
request calls. |
static <T,C> Stream<T> |
createWith(BiConsumer<Long,SubscriberWithContext<T,C>> requestConsumer,
Function<org.reactivestreams.Subscriber<? super T>,C> contextFactory,
Consumer<C> shutdownConsumer)
Create a
Stream reacting on requests with the passed BiConsumer . |
static <T> Stream<T> |
createWith(BiConsumer<Long,SubscriberWithContext<T,Void>> requestConsumer)
Create a
Stream reacting on requests with the passed BiConsumer |
static <T> Stream<T> |
defer(Supplier<? extends org.reactivestreams.Publisher<T>> supplier)
Supply a
Publisher everytime subscribe is called on the returned stream. |
static <T> Stream<T> |
empty()
Build a Stream that will only emit a complete signal to any new subscriber.
|
static <O,T extends Throwable> |
fail(T throwable)
Build a Stream that will only emit an error signal to any new subscriber.
|
static <T> Stream<T> |
from(Future<? extends T> future)
Build a Stream that will only emit the result of the future and then complete.
|
static <T> Stream<T> |
from(Future<? extends T> future,
long time,
TimeUnit unit)
Build a Stream that will only emit the result of the future and then complete.
|
static <T> Stream<T> |
from(Iterable<? extends T> values)
Build a Stream whom data is sourced by each element of the passed iterable on subscription request.
|
static <T> Stream<T> |
from(T[] values)
Build a Stream whom data is sourced by each element of the passed array on subscription request.
|
static <T> Stream<T> |
generate(Supplier<? extends T> value)
Build a Stream whose data is generated by the passed supplier on subscription request.
|
static <T> Stream<List<T>> |
join(List<? extends org.reactivestreams.Publisher<? extends T>> sources)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<T>> source)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<List<T>> |
join(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7,
org.reactivestreams.Publisher<? extends T> source8)
Build a Synchronous Stream whose data are aggregated from the passed publishers
(1 element consumed for each merged publisher. resulting in an array of size of .
|
static <T> Stream<T> |
just(T value1)
Build a Stream whom data is sourced by the passed element on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3,
T value4)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3,
T value4,
T value5)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3,
T value4,
T value5,
T value6)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3,
T value4,
T value5,
T value6,
T value7)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
just(T value1,
T value2,
T value3,
T value4,
T value5,
T value6,
T value7,
T value8)
Build a Stream whom data is sourced by each element of the passed iterable on subscription
request.
|
static <T> Stream<T> |
merge(Iterable<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T,E extends T> |
merge(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> mergedPublishers)
Build a Synchronous Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
merge(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
org.reactivestreams.Publisher<? extends T> source3,
org.reactivestreams.Publisher<? extends T> source4,
org.reactivestreams.Publisher<? extends T> source5,
org.reactivestreams.Publisher<? extends T> source6,
org.reactivestreams.Publisher<? extends T> source7,
org.reactivestreams.Publisher<? extends T> source8)
Build a Stream whose data are generated by the passed publishers.
|
static <T> Stream<T> |
never()
Build a Stream that will never emit anything.
|
static Stream<Long> |
period(long period)
Build a Stream that will emit ever increasing counter from 0 after on each period from the subscribe
call.
|
static Stream<Long> |
period(long delay,
long period)
Build a Stream that will emit ever increasing counter from 0 after the time delay on each period.
|
static Stream<Long> |
period(long delay,
long period,
TimeUnit unit)
Build a Stream that will emit ever increasing counter from 0 after the subscribe call on each period.
|
static Stream<Long> |
period(long period,
TimeUnit unit)
Build a Stream that will emit ever increasing counter from 0 after the subscribe call on each period.
|
static Stream<Long> |
period(Timer timer,
long period)
Build a Stream that will emit ever increasing counter from 0 after on each period from the subscribe
call.
|
static Stream<Long> |
period(Timer timer,
long delay,
long period)
Build a Stream that will emit ever increasing counter from 0 after the time delay on each period.
|
static Stream<Long> |
period(Timer timer,
long delay,
long period,
TimeUnit unit)
Build a Stream that will emit ever increasing counter from 0 after the time delay on each period.
|
static Stream<Long> |
period(Timer timer,
long period,
TimeUnit unit)
Build a Stream that will emit ever increasing counter from 0 after the subscribe call on each period.
|
static Stream<Long> |
range(long start,
long end)
Build a Stream that will only emit a sequence of longs within the specified range and then
complete.
|
static <T> Action<org.reactivestreams.Publisher<? extends T>,T> |
switchOnNext()
Build a Synchronous Action whose data are emitted by the most recent
Action.onNext(Object)
signaled publisher. |
static <T> Action<org.reactivestreams.Publisher<? extends T>,T> |
switchOnNext(Dispatcher dispatcher)
Build an Action whose data are emitted by the most recent
Action.onNext(Object) signaled
publisher. |
static <T> Stream<T> |
switchOnNext(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
Build a Synchronous Stream whose data are emitted by the most recent passed publisher.
|
static <T> Stream<T> |
switchOnNext(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers,
Dispatcher dispatcher)
Build a Stream whose data are emitted by the most recent passed publisher.
|
static Stream<Long> |
timer(long delay)
Build a Stream that will only emit 0l after the time delay and then complete.
|
static Stream<Long> |
timer(long delay,
TimeUnit unit)
Build a Stream that will only emit 0l after the time delay and then complete.
|
static Stream<Long> |
timer(Timer timer,
long delay)
Build a Stream that will only emit 0l after the time delay and then complete.
|
static Stream<Long> |
timer(Timer timer,
long delay,
TimeUnit unit)
Build a Stream that will only emit 0l after the time delay and then complete.
|
static <T> Stream<T> |
wrap(org.reactivestreams.Publisher<T> publisher)
A simple decoration of the given
Publisher to expose Stream API and proxy any subscribe call to
the publisher. |
static <TUPLE extends Tuple,V> |
zip(List<? extends org.reactivestreams.Publisher<?>> sources,
Function<TUPLE,? extends V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <E,TUPLE extends Tuple,V> |
zip(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> sources,
Function<TUPLE,? extends V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,V> Stream<V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
Function<Tuple2<T1,T2>,? extends V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
Function<Tuple3<T1,T2,T3>,? extends V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,T4,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
Function<Tuple4<T1,T2,T3,T4>,V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,T4,T5,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
Function<Tuple5<T1,T2,T3,T4,T5>,V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,T4,T5,T6,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
Function<Tuple6<T1,T2,T3,T4,T5,T6>,V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,T4,T5,T6,T7,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
org.reactivestreams.Publisher<? extends T7> source7,
Function<Tuple7<T1,T2,T3,T4,T5,T6,T7>,V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
static <T1,T2,T3,T4,T5,T6,T7,T8,V> |
zip(org.reactivestreams.Publisher<? extends T1> source1,
org.reactivestreams.Publisher<? extends T2> source2,
org.reactivestreams.Publisher<? extends T3> source3,
org.reactivestreams.Publisher<? extends T4> source4,
org.reactivestreams.Publisher<? extends T5> source5,
org.reactivestreams.Publisher<? extends T6> source6,
org.reactivestreams.Publisher<? extends T7> source7,
org.reactivestreams.Publisher<? extends T8> source8,
Function<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>,? extends V> combinator)
Build a Stream whose data are generated by the passed publishers.
|
public static <T> Stream<T> create(org.reactivestreams.Publisher<T> publisher)
Publisher
that will be
subscribed on the
first
request from the new subscriber. It means that the passed Subscription.request(long)
manually triggered or automatically consumed by Stream.consume()
operations. The sequence
consists
of a series of calls to the Subscriber
argument:
onSubscribe?|onNext*|onError?|onComplete.
Strict application of this protocol is not enforced, e.g. onSubscribe is not required as a buffering subscription
will be created
anyway.
For simply decorating a given Publisher with Stream
API, and thus relying on the publisher to honour the
Reactive Streams protocol,
use the wrap(Publisher)
T
- the type of values passing through the Streampublisher
- the publisher to accept the Stream subscriberStream
public static <T> Stream<T> createWith(BiConsumer<Long,SubscriberWithContext<T,Void>> requestConsumer)
Stream
reacting on requests with the passed BiConsumer
T
- The type of the data sequencerequestConsumer
- A BiConsumer
with left argument request and right argument target subscriberpublic static <T,C> Stream<T> createWith(BiConsumer<Long,SubscriberWithContext<T,C>> requestConsumer, Function<org.reactivestreams.Subscriber<? super T>,C> contextFactory)
Stream
reacting on requests with the passed BiConsumer
The argument contextFactory
is executed once by new subscriber to generate a context shared by every
request calls.T
- The type of the data sequenceC
- The type of contextual information to be read by the requestConsumerrequestConsumer
- A BiConsumer
with left argument request and right argument target subscribercontextFactory
- A Function
called for every new subscriber returning an immutable context (IO
connection...)public static <T,C> Stream<T> createWith(BiConsumer<Long,SubscriberWithContext<T,C>> requestConsumer, Function<org.reactivestreams.Subscriber<? super T>,C> contextFactory, Consumer<C> shutdownConsumer)
Stream
reacting on requests with the passed BiConsumer
.
The argument contextFactory
is executed once by new subscriber to generate a context shared by every
request calls.
The argument shutdownConsumer
is executed once by subscriber termination event (cancel, onComplete,
onError).T
- The type of the data sequenceC
- The type of contextual information to be read by the requestConsumerrequestConsumer
- A BiConsumer
with left argument request and right argument target subscribercontextFactory
- A Function
called once for every new subscriber returning an immutable context
(IO connection...)shutdownConsumer
- A Consumer
called once everytime a subscriber terminates: cancel, onComplete(),
onError()public static <T> Stream<T> wrap(org.reactivestreams.Publisher<T> publisher)
Publisher
to expose Stream
API and proxy any subscribe call to
the publisher.
The Publisher has to first call onSubscribe and receive a subscription request callback before any onNext call or
will risk loosing events.T
- the type of values passing through the Streampublisher
- the publisher to decorate the Stream subscriberStream
public static <T> Stream<T> defer(Supplier<? extends org.reactivestreams.Publisher<T>> supplier)
Publisher
everytime subscribe is called on the returned stream. The passed .Supplier
will be invoked and it's up to the developer to choose to return a new instance of a Publisher
or reuse
one,
effecitvely behaving like wrap(Publisher)
.T
- the type of values passing through the Streamsupplier
- the publisher factory to call on subscribeStream
public static <T> Stream<T> empty()
Stream
public static <T> Stream<T> never()
Stream
public static <O,T extends Throwable> Stream<O> fail(T throwable)
Stream
public static <T> Stream<T> from(Iterable<? extends T> values)
It will use the passed dispatcher to emit signals.
T
- type of the valuesvalues
- The values to onNext()
Stream
based on the given valuespublic static <T> Stream<T> from(T[] values)
It will use the passed dispatcher to emit signals.
T
- type of the valuesvalues
- The values to onNext()
Stream
based on the given valuespublic static <T> Stream<T> from(Future<? extends T> future)
future
- the future to poll value fromStream
public static <T> Stream<T> from(Future<? extends T> future, long time, TimeUnit unit)
future
- the future to poll value fromStream
public static Stream<Long> range(long start, long end)
start
- the inclusive starting value to be emittedend
- the inclusive closing value to be emittedStream
public static Stream<Long> timer(long delay)
delay
- the timespan in SECONDS to wait before emitting 0l and complete signalsStream
public static Stream<Long> timer(Timer timer, long delay)
timer
- the timer to run ondelay
- the timespan in SECONDS to wait before emitting 0l and complete signalsStream
public static Stream<Long> timer(long delay, TimeUnit unit)
delay
- the timespan in [unit] to wait before emitting 0l and complete signalsunit
- the time unitStream
public static Stream<Long> timer(Timer timer, long delay, TimeUnit unit)
timer
- the timer to run ondelay
- the timespan in [unit] to wait before emitting 0l and complete signalsunit
- the time unitStream
public static Stream<Long> period(long period)
period
- the period in SECONDS before each following incrementStream
public static Stream<Long> period(Timer timer, long period)
timer
- the timer to run onperiod
- the period in SECONDS before each following incrementStream
public static Stream<Long> period(long delay, long period)
delay
- the timespan in SECONDS to wait before emitting 0lperiod
- the period in SECONDS before each following incrementStream
public static Stream<Long> period(Timer timer, long delay, long period)
timer
- the timer to run ondelay
- the timespan in SECONDS to wait before emitting 0lperiod
- the period in SECONDS before each following incrementStream
public static Stream<Long> period(long period, TimeUnit unit)
period
- the period in [unit] before each following incrementunit
- the time unitStream
public static Stream<Long> period(Timer timer, long period, TimeUnit unit)
timer
- the timer to run onperiod
- the period in [unit] before each following incrementunit
- the time unitStream
public static Stream<Long> period(long delay, long period, TimeUnit unit)
delay
- the timespan in [unit] to wait before emitting 0lperiod
- the period in [unit] before each following incrementunit
- the time unitStream
public static Stream<Long> period(Timer timer, long delay, long period, TimeUnit unit)
timer
- the timer to run ondelay
- the timespan in [unit] to wait before emitting 0lperiod
- the period in [unit] before each following incrementunit
- the time unitStream
public static <T> Stream<T> just(T value1)
T
- type of the valuesvalue1
- The only value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3, T value4)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
value4
- The fourth value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
value4
- The fourth value to onNext()
value5
- The fifth value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
value4
- The fourth value to onNext()
value5
- The fifth value to onNext()
value6
- The sixth value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6, T value7)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
value4
- The fourth value to onNext()
value5
- The fifth value to onNext()
value6
- The sixth value to onNext()
value7
- The seventh value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6, T value7, T value8)
T
- type of the valuesvalue1
- The first value to onNext()
value2
- The second value to onNext()
value3
- The third value to onNext()
value4
- The fourth value to onNext()
value5
- The fifth value to onNext()
value6
- The sixth value to onNext()
value7
- The seventh value to onNext()
value8
- The eigth value to onNext()
Stream
based on the given valuespublic static <T> Stream<T> generate(Supplier<? extends T> value)
T
- type of the valuevalue
- The value to onNext()
Stream
based on the produced valuepublic static <T> Action<org.reactivestreams.Publisher<? extends T>,T> switchOnNext()
Action.onNext(Object)
signaled publisher.
The stream will complete once both the publishers source and the last switched to publisher have completed.T
- type of the valueAction
accepting publishers and producing inner data Tpublic static <T> Action<org.reactivestreams.Publisher<? extends T>,T> switchOnNext(Dispatcher dispatcher)
Action.onNext(Object)
signaled
publisher.
The stream will complete once both the publishers source and the last switched to publisher have completed.T
- type of the valuedispatcher
- The dispatcher to execute the signalsAction
accepting publishers and producing inner data Tpublic static <T> Stream<T> switchOnNext(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
T
- type of the valuemergedPublishers
- The publisher of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> switchOnNext(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers, Dispatcher dispatcher)
T
- type of the valuemergedPublishers
- The publisher of upstream Publisher
to subscribe to.dispatcher
- The dispatcher to execute the signalsStream
based on the produced valuepublic static <T> Stream<T> concat(Iterable<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
T
- type of the valuemergedPublishers
- The list of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<? extends T>> concatdPublishers)
T
- type of the valueconcatdPublishers
- The publisher of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> concat(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7, org.reactivestreams.Publisher<? extends T> source8)
T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eigth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(Iterable<? extends org.reactivestreams.Publisher<? extends T>> mergedPublishers)
Stream
publisher type.T
- type of the valuemergedPublishers
- The list of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T,E extends T> Stream<E> merge(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> mergedPublishers)
Stream
publisher type.T
- type of the valuemergedPublishers
- The publisher of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<T> merge(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7, org.reactivestreams.Publisher<? extends T> source8)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eigth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T1,T2,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, Function<Tuple2<T1,T2>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, Function<Tuple3<T1,T2,T3>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, Function<Tuple4<T1,T2,T3,T4>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, Function<Tuple5<T1,T2,T3,T4,T5>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, Function<Tuple6<T1,T2,T3,T4,T5,T6>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,T7,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, org.reactivestreams.Publisher<? extends T7> source7, Function<Tuple7<T1,T2,T3,T4,T5,T6,T7>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,T7,T8,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, org.reactivestreams.Publisher<? extends T7> source7, org.reactivestreams.Publisher<? extends T8> source8, Function<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eigth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <TUPLE extends Tuple,V> Stream<V> combineLatest(List<? extends org.reactivestreams.Publisher<?>> sources, Function<TUPLE,? extends V> combinator)
Stream
publisher type.V
- The produced output after transformation by TUPLE
- The type of tuple to use that must match source Publishers typesources
- The list of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <E,TUPLE extends Tuple,V> Stream<V> combineLatest(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> sources, Function<TUPLE,? extends V> combinator)
Stream
publisher type.V
- The produced output after transformation by E
- The inner type of sources
- The publisher of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, Function<Tuple2<T1,T2>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, Function<Tuple3<T1,T2,T3>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, Function<Tuple4<T1,T2,T3,T4>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, Function<Tuple5<T1,T2,T3,T4,T5>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, Function<Tuple6<T1,T2,T3,T4,T5,T6>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,T7,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, org.reactivestreams.Publisher<? extends T7> source7, Function<Tuple7<T1,T2,T3,T4,T5,T6,T7>,V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,T7,T8,V> Stream<V> zip(org.reactivestreams.Publisher<? extends T1> source1, org.reactivestreams.Publisher<? extends T2> source2, org.reactivestreams.Publisher<? extends T3> source3, org.reactivestreams.Publisher<? extends T4> source4, org.reactivestreams.Publisher<? extends T5> source5, org.reactivestreams.Publisher<? extends T6> source6, org.reactivestreams.Publisher<? extends T7> source7, org.reactivestreams.Publisher<? extends T8> source8, Function<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>,? extends V> combinator)
Stream
publisher type.T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8V
- The produced output after transformation by source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eigth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <TUPLE extends Tuple,V> Stream<V> zip(List<? extends org.reactivestreams.Publisher<?>> sources, Function<TUPLE,? extends V> combinator)
Stream
publisher type.V
- The produced output after transformation by TUPLE
- The type of tuple to use that must match source Publishers typesources
- The list of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <E,TUPLE extends Tuple,V> Stream<V> zip(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<E>> sources, Function<TUPLE,? extends V> combinator)
Stream
publisher type.V
- The produced output after transformation by E
- The inner type of sources
- The publisher of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamStream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fourth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, org.reactivestreams.Publisher<? extends T> source3, org.reactivestreams.Publisher<? extends T> source4, org.reactivestreams.Publisher<? extends T> source5, org.reactivestreams.Publisher<? extends T> source6, org.reactivestreams.Publisher<? extends T> source7, org.reactivestreams.Publisher<? extends T> source8)
Stream
publisher type.T
- type of the valuesource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eigth upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(List<? extends org.reactivestreams.Publisher<? extends T>> sources)
Stream
publisher type.T
- type of the valuesources
- The list of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static <T> Stream<List<T>> join(org.reactivestreams.Publisher<? extends org.reactivestreams.Publisher<T>> source)
Stream
publisher type.T
- type of the valuesource
- The publisher of upstream Publisher
to subscribe to.Stream
based on the produced valuepublic static void await(org.reactivestreams.Publisher<?> publisher) throws Throwable
publisher
- the publisher to listen for terminal signalsThrowable
public static void await(org.reactivestreams.Publisher<?> publisher, long timeout) throws Throwable
publisher
- the publisher to listen for terminal signalstimeout
- the maximum wait time in secondsThrowable
public static void await(org.reactivestreams.Publisher<?> publisher, long timeout, TimeUnit unit) throws Throwable
unit
until a terminal signal from the passed publisher has been emitted.
If the terminal signal is an error, it will propagate to the caller.
Effectively this is making sure a stream has completed before the return of this call.
It is usually used in controlled environment such as tests.publisher
- the publisher to listen for terminal signalstimeout
- the maximum wait time in unitunit
- the TimeUnit to use for the timeoutThrowable
public static void await(org.reactivestreams.Publisher<?> publisher, long timeout, TimeUnit unit, boolean request) throws Throwable
unit
until a terminal signal from the passed publisher has been emitted.
If the terminal signal is an error, it will propagate to the caller.
Effectively this is making sure a stream has completed before the return of this call.
It is usually used in controlled environment such as tests.publisher
- the publisher to listen for terminal signalstimeout
- the maximum wait time in unitunit
- the TimeUnit to use for the timeoutThrowable
Copyright © 2017. All rights reserved.