PollerFlux<T,U> Class
java.lang.Object
reactor.core.publisher.Flux
com.azure.core.util.polling.PollerFlux<T,U>
Type Parameters
T
The type of poll response value.
U
The type of the final result of long-running operation.
public final class PollerFlux<T,U> extends reactor.core.publisher.Flux <AsyncPollResponse <T ,U >>
A Flux that simplifies the task of executing long-running operations against an Azure service. A subscription to PollerFlux<T,U> initiates a long-running operation and polls the status until it completes.
Code samples
Instantiating and subscribing to PollerFlux
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMillis(800));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
// Define your custom poll operation
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.error(new RuntimeException("Cancellation is not supported")),
(context) -> Mono.just("Final Output"));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Asynchronously wait for polling to complete and then retrieve the final result
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (asyncPollResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
// operation completed successfully, retrieving final result.
return asyncPollResponse
.getFinalResult();
} else {
return Mono.error(new RuntimeException("polling completed unsuccessfully with status:"
+ asyncPollResponse.getStatus()));
}
}).block();
Block for polling to complete and then retrieve the final result
AsyncPollResponse<String, String> terminalResponse = pollerFlux.blockLast();
System.out.printf("Polling complete. Final Status: %s", terminalResponse.getStatus());
if (terminalResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
String finalResult = terminalResponse.getFinalResult().block();
System.out.printf("Polling complete. Final Status: %s", finalResult);
}
Asynchronously poll until poller receives matching status
final Predicate<AsyncPollResponse<String, String>> isComplete = response -> {
return response.getStatus() != LongRunningOperationStatus.IN_PROGRESS
&& response.getStatus() != LongRunningOperationStatus.NOT_STARTED;
};
pollerFlux
.takeUntil(isComplete)
.subscribe(completed -> {
System.out.println("Completed poll response, status: " + completed.getStatus());
});
Asynchronously cancel the long running operation
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
// Asynchronously wait 30 minutes to complete the polling, if not completed
// within in the time then cancel the server operation.
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (!asyncPollResponse.getStatus().isComplete()) {
return asyncPollResponse
.cancelOperation()
.then(Mono.error(new RuntimeException("Operation is cancelled!")));
} else {
return Mono.just(asyncPollResponse);
}
}).block();
Instantiating and subscribing to PollerFlux from a known polling strategy
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
new OperationResourcePollingStrategy<>(new HttpPipelineBuilder().build()),
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Instantiating and subscribing to PollerFlux from a custom polling strategy
// Create custom polling strategy based on OperationResourcePollingStrategy
PollingStrategy<BinaryData, String> strategy = new OperationResourcePollingStrategy<BinaryData, String>(
new HttpPipelineBuilder().build()) {
// override any interface method to customize the polling behavior
@Override
public Mono<PollResponse<BinaryData>> poll(PollingContext<BinaryData> context,
TypeReference<BinaryData> pollResponseType) {
return Mono.just(new PollResponse<>(
LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
BinaryData.fromString("")));
}
};
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
strategy,
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Constructor Summary
Method Summary
Modifier and Type
Method and Description
static
PollerFlux <T ,U >
<T,U>create(Duration pollInterval, Function<PollingContext<T>,Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono<U>> fetchResultOperation)
Creates PollerFlux.
static
PollerFlux <T ,U >
<T,U>create(Duration pollInterval, Supplier<Mono<? extends Response<?>>> initialOperation, PollingStrategy<T,U> strategy, TypeReference<T> pollResponseType, TypeReference<U> resultType)
Creates PollerFlux.
static
PollerFlux <T ,U >
<T,U>error(Exception ex)
Creates a PollerFlux instance that returns an error on subscription.
Duration
getPollInterval()
Returns the current polling duration for this PollerFlux<T,U> instance.
SyncPoller <T ,U >
getSyncPoller()
Gets a synchronous blocking poller.
PollerFlux <T ,U >
setPollInterval(Duration pollInterval)
Sets the poll interval for this poller.
void
subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)
Methods inherited from java.lang.Object
Methods inherited from reactor.core.publisher.Flux
reactor.core.publisher.Flux.<A>reduce(A,java.util.function.BiFunction<A,
reactor.core.publisher.Flux.<A>reduceWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A,
reactor.core.publisher.Flux.<A>scan(A,java.util.function.BiFunction<A,
reactor.core.publisher.Flux.<A>scanWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A,
reactor.core.publisher.Flux.<C>buffer
reactor.core.publisher.Flux.<C>buffer
reactor.core.publisher.Flux.<C>buffer(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<C>bufferTimeout
reactor.core.publisher.Flux.<C>bufferTimeout
reactor.core.publisher.Flux.<C>bufferTimeout
reactor.core.publisher.Flux.<C>bufferTimeout
reactor.core.publisher.Flux.<E>cast
reactor.core.publisher.Flux.<E>collect(java.util.function.Supplier<E>,java.util.function.BiConsumer<E,
reactor.core.publisher.Flux.<E>doOnError(java.lang.Class<E>,java.util.function.Consumer<
reactor.core.publisher.Flux.<E>onErrorContinue
reactor.core.publisher.Flux.<E>onErrorContinue
reactor.core.publisher.Flux.<E>onErrorMap(java.lang.Class<E>,java.util.function.Function<
reactor.core.publisher.Flux.<E>onErrorResume(java.lang.Class<E>,java.util.function.Function<
reactor.core.publisher.Flux.<E>onErrorReturn
reactor.core.publisher.Flux.<E>subscribeWith
reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function<
reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function<
reactor.core.publisher.Flux.<I>first(java.lang.Iterable<
reactor.core.publisher.Flux.<I>first(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>firstWithSignal(java.lang.Iterable<
reactor.core.publisher.Flux.<I>firstWithSignal(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>firstWithValue(java.lang.Iterable<
reactor.core.publisher.Flux.<I>firstWithValue(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>index(java.util.function.BiFunction<
reactor.core.publisher.Flux.<I>merge(int,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>merge(java.lang.Iterable<
reactor.core.publisher.Flux.<I>merge(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeComparing(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeDelayError(int,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeOrdered(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergePriority(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeSequential(int,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable<
reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable<
reactor.core.publisher.Flux.<I>mergeSequential(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeSequentialDelayError(int,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<I>mergeSequentialDelayError(java.lang.Iterable<
reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function<
reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function<
reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function<
reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function<
reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function<
reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function<
reactor.core.publisher.Flux.<K>collectMap(java.util.function.Function<
reactor.core.publisher.Flux.<K>collectMultimap(java.util.function.Function<
reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function<
reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function<
reactor.core.publisher.Flux.<O>error
reactor.core.publisher.Flux.<O>zip(java.lang.Iterable<
reactor.core.publisher.Flux.<O>zip(java.lang.Iterable<
reactor.core.publisher.Flux.<P>as(java.util.function.Function<
reactor.core.publisher.Flux.<R,A>collect(java.util.stream.Collector<
reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function<
reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function<
reactor.core.publisher.Flux.<R>doOnDiscard(java.lang.Class<R>,java.util.function.Consumer<
reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function<
reactor.core.publisher.Flux.<R>flatMapSequentialDelayError(java.util.function.Function<
reactor.core.publisher.Flux.<R>handle(java.util.function.BiConsumer<
reactor.core.publisher.Flux.<R>publish(java.util.function.Function<
reactor.core.publisher.Flux.<R>publish(java.util.function.Function<
reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable<
reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable<
reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable<
reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable<
reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function<
reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function<
reactor.core.publisher.Flux.<T,S>generate
reactor.core.publisher.Flux.<T,S>generate(java.util.concurrent.Callable<S>,java.util.function.BiFunction<S,reactor.core.publisher.SynchronousSink<T>,S>,java.util.function.Consumer<
reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable<
reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable<
reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,int,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,O>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7,T8>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,V>combineLatest(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,V>combineLatest(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,T5>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4,V>combineLatest(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,T4>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3,V>combineLatest(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,T3>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2,V>combineLatest(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T1,T2>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T2,V>zipWithIterable(java.lang.Iterable<
reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T2>zipWithIterable(java.lang.Iterable<
reactor.core.publisher.Flux.<T>concat(java.lang.Iterable<
reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>create(java.util.function.Consumer<
reactor.core.publisher.Flux.<T>create(java.util.function.Consumer<
reactor.core.publisher.Flux.<T>defer(java.util.function.Supplier<
reactor.core.publisher.Flux.<T>deferContextual(java.util.function.Function<reactor.util.context.ContextView,
reactor.core.publisher.Flux.<T>empty
reactor.core.publisher.Flux.<T>error
reactor.core.publisher.Flux.<T>error(java.util.function.Supplier<
reactor.core.publisher.Flux.<T>from(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>fromArray
reactor.core.publisher.Flux.<T>fromIterable(java.lang.Iterable<
reactor.core.publisher.Flux.<T>fromStream(java.util.function.Supplier<java.util.stream.Stream<
reactor.core.publisher.Flux.<T>fromStream(java.util.stream.Stream<
reactor.core.publisher.Flux.<T>generate
reactor.core.publisher.Flux.<T>just
reactor.core.publisher.Flux.<T>just
reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>mergeComparing(int,java.util.Comparator<
reactor.core.publisher.Flux.<T>mergeComparing(java.util.Comparator<
reactor.core.publisher.Flux.<T>mergeComparingDelayError(int,java.util.Comparator<
reactor.core.publisher.Flux.<T>mergeOrdered(int,java.util.Comparator<
reactor.core.publisher.Flux.<T>mergeOrdered(java.util.Comparator<
reactor.core.publisher.Flux.<T>mergePriority(int,java.util.Comparator<
reactor.core.publisher.Flux.<T>mergePriority(java.util.Comparator<
reactor.core.publisher.Flux.<T>mergePriorityDelayError(int,java.util.Comparator<
reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>mergeSequentialDelayError(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>never
reactor.core.publisher.Flux.<T>onAssembly
reactor.core.publisher.Flux.<T>onAssembly
reactor.core.publisher.Flux.<T>push(java.util.function.Consumer<
reactor.core.publisher.Flux.<T>push(java.util.function.Consumer<
reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>groupJoin(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>join(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<TUPLE,V>zip(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<U,R>withLatestFrom(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.<U,V,C>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function<
reactor.core.publisher.Flux.<U,V>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function<
reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function<
reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function<
reactor.core.publisher.Flux.<U,V>windowWhen(org.reactivestreams.Publisher<U>,java.util.function.Function<
reactor.core.publisher.Flux.<U>delaySubscription
reactor.core.publisher.Flux.<U>ofType
reactor.core.publisher.Flux.<U>sample
reactor.core.publisher.Flux.<U>sampleFirst(java.util.function.Function<
reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function<
reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function<
reactor.core.publisher.Flux.<U>timeout
reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function<
reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function<
reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function<
reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function<
reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function<
reactor.core.publisher.Flux.<V>distinct(java.util.function.Function<
reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>flatMapDelayError(java.util.function.Function<
reactor.core.publisher.Flux.<V>map(java.util.function.Function<
reactor.core.publisher.Flux.<V>mapNotNull(java.util.function.Function<
reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function<
reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal<
reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal<
reactor.core.publisher.Flux.<V>then
reactor.core.publisher.Flux.<V>thenMany
reactor.core.publisher.Flux.<V>transform(java.util.function.Function<
reactor.core.publisher.Flux.<V>transformDeferred(java.util.function.Function<
reactor.core.publisher.Flux.<V>transformDeferredContextual(java.util.function.BiFunction<
reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function<
reactor.core.publisher.Flux.<X>dematerialize
reactor.core.publisher.Flux.all(java.util.function.Predicate<
reactor.core.publisher.Flux.any(java.util.function.Predicate<
reactor.core.publisher.Flux.blockFirst
reactor.core.publisher.Flux.blockFirst
reactor.core.publisher.Flux.blockLast
reactor.core.publisher.Flux.blockLast
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer
reactor.core.publisher.Flux.buffer(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.bufferTimeout
reactor.core.publisher.Flux.bufferTimeout
reactor.core.publisher.Flux.bufferTimeout
reactor.core.publisher.Flux.bufferTimeout
reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate<
reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate<
reactor.core.publisher.Flux.bufferUntilChanged
reactor.core.publisher.Flux.bufferWhile(java.util.function.Predicate<
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cache
reactor.core.publisher.Flux.cancelOn
reactor.core.publisher.Flux.checkpoint
reactor.core.publisher.Flux.checkpoint
reactor.core.publisher.Flux.checkpoint
reactor.core.publisher.Flux.collectList
reactor.core.publisher.Flux.collectSortedList
reactor.core.publisher.Flux.collectSortedList(java.util.Comparator<
reactor.core.publisher.Flux.concatWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.concatWithValues
reactor.core.publisher.Flux.contextCapture
reactor.core.publisher.Flux.contextWrite
reactor.core.publisher.Flux.contextWrite
reactor.core.publisher.Flux.count
reactor.core.publisher.Flux.defaultIfEmpty
reactor.core.publisher.Flux.delayElements
reactor.core.publisher.Flux.delayElements
reactor.core.publisher.Flux.delaySequence
reactor.core.publisher.Flux.delaySequence
reactor.core.publisher.Flux.delaySubscription
reactor.core.publisher.Flux.delaySubscription
reactor.core.publisher.Flux.delayUntil(java.util.function.Function<
reactor.core.publisher.Flux.distinct
reactor.core.publisher.Flux.distinctUntilChanged
reactor.core.publisher.Flux.doAfterTerminate
reactor.core.publisher.Flux.doFinally
reactor.core.publisher.Flux.doFirst
reactor.core.publisher.Flux.doOnCancel
reactor.core.publisher.Flux.doOnComplete
reactor.core.publisher.Flux.doOnEach(java.util.function.Consumer<
reactor.core.publisher.Flux.doOnError(java.util.function.Consumer<
reactor.core.publisher.Flux.doOnError(java.util.function.Predicate<
reactor.core.publisher.Flux.doOnNext(java.util.function.Consumer<
reactor.core.publisher.Flux.doOnRequest
reactor.core.publisher.Flux.doOnSubscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.doOnTerminate
reactor.core.publisher.Flux.elapsed
reactor.core.publisher.Flux.elapsed
reactor.core.publisher.Flux.elementAt
reactor.core.publisher.Flux.elementAt
reactor.core.publisher.Flux.expand(java.util.function.Function<
reactor.core.publisher.Flux.expand(java.util.function.Function<
reactor.core.publisher.Flux.expandDeep(java.util.function.Function<
reactor.core.publisher.Flux.expandDeep(java.util.function.Function<
reactor.core.publisher.Flux.filter(java.util.function.Predicate<
reactor.core.publisher.Flux.filterWhen(java.util.function.Function<
reactor.core.publisher.Flux.filterWhen(java.util.function.Function<
reactor.core.publisher.Flux.getPrefetch
reactor.core.publisher.Flux.hasElement
reactor.core.publisher.Flux.hasElements
reactor.core.publisher.Flux.hide
reactor.core.publisher.Flux.ignoreElements
reactor.core.publisher.Flux.index
reactor.core.publisher.Flux.interval
reactor.core.publisher.Flux.interval
reactor.core.publisher.Flux.interval
reactor.core.publisher.Flux.interval
reactor.core.publisher.Flux.last
reactor.core.publisher.Flux.last
reactor.core.publisher.Flux.limitRate
reactor.core.publisher.Flux.limitRate
reactor.core.publisher.Flux.limitRequest
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.log
reactor.core.publisher.Flux.materialize
reactor.core.publisher.Flux.mergeComparingWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.mergeOrderedWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.mergeWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.metrics
reactor.core.publisher.Flux.name
reactor.core.publisher.Flux.next
reactor.core.publisher.Flux.onBackpressureBuffer
reactor.core.publisher.Flux.onBackpressureBuffer
reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer<
reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer<
reactor.core.publisher.Flux.onBackpressureBuffer
reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer<
reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer<
reactor.core.publisher.Flux.onBackpressureDrop
reactor.core.publisher.Flux.onBackpressureDrop(java.util.function.Consumer<
reactor.core.publisher.Flux.onBackpressureError
reactor.core.publisher.Flux.onBackpressureLatest
reactor.core.publisher.Flux.onErrorComplete
reactor.core.publisher.Flux.onErrorComplete(java.lang.Class<
reactor.core.publisher.Flux.onErrorComplete(java.util.function.Predicate<
reactor.core.publisher.Flux.onErrorContinue
reactor.core.publisher.Flux.onErrorMap(java.util.function.Function<
reactor.core.publisher.Flux.onErrorMap(java.util.function.Predicate<
reactor.core.publisher.Flux.onErrorResume(java.util.function.Function<
reactor.core.publisher.Flux.onErrorResume(java.util.function.Predicate<
reactor.core.publisher.Flux.onErrorReturn
reactor.core.publisher.Flux.onErrorReturn(java.util.function.Predicate<
reactor.core.publisher.Flux.onErrorStop
reactor.core.publisher.Flux.onTerminateDetach
reactor.core.publisher.Flux.or(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.parallel
reactor.core.publisher.Flux.parallel
reactor.core.publisher.Flux.parallel
reactor.core.publisher.Flux.publish
reactor.core.publisher.Flux.publish
reactor.core.publisher.Flux.publishNext
reactor.core.publisher.Flux.publishOn
reactor.core.publisher.Flux.publishOn
reactor.core.publisher.Flux.publishOn
reactor.core.publisher.Flux.range
reactor.core.publisher.Flux.reduce
reactor.core.publisher.Flux.repeat
reactor.core.publisher.Flux.repeat
reactor.core.publisher.Flux.repeat
reactor.core.publisher.Flux.repeat
reactor.core.publisher.Flux.repeatWhen(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>,
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.replay
reactor.core.publisher.Flux.retry
reactor.core.publisher.Flux.retry
reactor.core.publisher.Flux.retryWhen
reactor.core.publisher.Flux.sample
reactor.core.publisher.Flux.sampleFirst
reactor.core.publisher.Flux.scan
reactor.core.publisher.Flux.share
reactor.core.publisher.Flux.shareNext
reactor.core.publisher.Flux.single
reactor.core.publisher.Flux.single
reactor.core.publisher.Flux.singleOrEmpty
reactor.core.publisher.Flux.skip
reactor.core.publisher.Flux.skip
reactor.core.publisher.Flux.skip
reactor.core.publisher.Flux.skipLast
reactor.core.publisher.Flux.skipUntil(java.util.function.Predicate<
reactor.core.publisher.Flux.skipUntilOther(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.skipWhile(java.util.function.Predicate<
reactor.core.publisher.Flux.sort
reactor.core.publisher.Flux.sort(java.util.Comparator<
reactor.core.publisher.Flux.startWith
reactor.core.publisher.Flux.startWith(java.lang.Iterable<
reactor.core.publisher.Flux.startWith(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.subscribe
reactor.core.publisher.Flux.subscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.subscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.subscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.subscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.subscribe(java.util.function.Consumer<
reactor.core.publisher.Flux.subscribe(org.reactivestreams.Subscriber<
reactor.core.publisher.Flux.subscribe(reactor.core.CoreSubscriber<
reactor.core.publisher.Flux.subscribeOn
reactor.core.publisher.Flux.subscribeOn
reactor.core.publisher.Flux.switchIfEmpty(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.tag
reactor.core.publisher.Flux.take
reactor.core.publisher.Flux.take
reactor.core.publisher.Flux.take
reactor.core.publisher.Flux.take
reactor.core.publisher.Flux.takeLast
reactor.core.publisher.Flux.takeUntil(java.util.function.Predicate<
reactor.core.publisher.Flux.takeUntilOther(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.takeWhile(java.util.function.Predicate<
reactor.core.publisher.Flux.tap
reactor.core.publisher.Flux.tap
reactor.core.publisher.Flux.tap(reactor.core.observability.SignalListenerFactory<T,
reactor.core.publisher.Flux.then
reactor.core.publisher.Flux.thenEmpty
reactor.core.publisher.Flux.timed
reactor.core.publisher.Flux.timed
reactor.core.publisher.Flux.timeout
reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher<
reactor.core.publisher.Flux.timeout
reactor.core.publisher.Flux.timestamp
reactor.core.publisher.Flux.timestamp
reactor.core.publisher.Flux.toIterable
reactor.core.publisher.Flux.toIterable
reactor.core.publisher.Flux.toIterable
reactor.core.publisher.Flux.toStream
reactor.core.publisher.Flux.toStream
reactor.core.publisher.Flux.toString
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window
reactor.core.publisher.Flux.window(org.reactivestreams.Publisher<
reactor.core.publisher.Flux.windowTimeout
reactor.core.publisher.Flux.windowTimeout
reactor.core.publisher.Flux.windowTimeout
reactor.core.publisher.Flux.windowTimeout
reactor.core.publisher.Flux.windowUntil
reactor.core.publisher.Flux.windowUntil
reactor.core.publisher.Flux.windowUntil
reactor.core.publisher.Flux.windowUntilChanged
reactor.core.publisher.Flux.windowWhile
reactor.core.publisher.Flux.windowWhile
Constructor Details
PollerFlux
public PollerFlux(Duration pollInterval, Function<PollingContext<T>,Mono<T>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono> fetchResultOperation)
Creates PollerFlux.
Parameters:
pollInterval
- the polling interval
activationOperation
- the activation operation to activate (start) the long-running operation. This
operation will be invoked at most once across all subscriptions. This parameter is required. If there is no
specific activation work to be done then invocation should return Mono.empty(), this operation will be called
with a new
PollingContext<T> .
pollOperation
- the operation to poll the current state of long-running operation. This parameter is
required and the operation will be called with current
PollingContext<T> .
cancelOperation
- a
Function that represents the operation to cancel the long-running operation if
service supports cancellation. This parameter is required. If service does not support cancellation then the
implementer should return
Mono#error with an error message indicating absence of cancellation support. The
operation will be called with current
PollingContext<T> .
fetchResultOperation
- a
Function that represents the operation to retrieve final result of the
long-running operation if service support it. This parameter is required and operation will be called with the
current
PollingContext<T> . If service does not have an api to fetch final result and if final result is same
as final poll response value then implementer can choose to simply return value from provided final poll
response.
Method Details
<T,U>create
public static PollerFlux<T,U> <T,U>create(Duration pollInterval, Function<PollingContext<T>,Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono> fetchResultOperation)
Creates PollerFlux.
This method differs from the PollerFlux constructor in that the constructor uses an activationOperation which returns a Mono that emits result, the create method uses an activationOperation which returns a Mono that emits PollResponse<T> . The PollResponse<T> holds the result. If the PollResponse<T> from the activationOperation indicate that long-running operation is completed then the pollOperation will not be called.
Parameters:
pollInterval
- the polling interval
activationOperation
- the activation operation to activate (start) the long-running operation. This
operation will be invoked at most once across all subscriptions. This parameter is required. If there is no
specific activation work to be done then invocation should return Mono.empty(), this operation will be called
with a new
PollingContext<T> .
pollOperation
- the operation to poll the current state of long-running operation. This parameter is
required and the operation will be called with current
PollingContext<T> .
cancelOperation
- a
Function that represents the operation to cancel the long-running operation if
service supports cancellation. This parameter is required. If service does not support cancellation then the
implementer should return
Mono#error with an error message indicating absence of cancellation support.
The operation will be called with current
PollingContext<T> .
fetchResultOperation
- a
Function that represents the operation to retrieve final result of the
long-running operation if service support it. This parameter is required and operation will be called current
PollingContext<T> . If service does not have an api to fetch final result and if final result is same as
final poll response value then implementer can choose to simply return value from provided final poll response.
Returns:
PollerFlux
<T,U>create
public static PollerFlux<T,U> <T,U>create(Duration pollInterval, Supplier<Mono>> initialOperation, PollingStrategy<T,U> strategy, TypeReference<T> pollResponseType, TypeReference resultType)
Creates PollerFlux.
This method uses a PollingStrategy<T,U> to poll the status of a long-running operation after the activation operation is invoked. See PollingStrategy<T,U> for more details of known polling strategies and how to create a custom strategy.
Parameters:
pollInterval
- the polling interval
initialOperation
- the activation operation to activate (start) the long-running operation. This operation
will be invoked at most once across all subscriptions. This parameter is required. If there is no specific
activation work to be done then invocation should return Mono.empty(), this operation will be called with a new
PollingContext<T> .
strategy
- a known strategy for polling a long-running operation in Azure
pollResponseType
- the
TypeReference<T> of the response type from a polling call, or BinaryData if raw
response body should be kept. This should match the generic parameter
U .
resultType
- the
TypeReference<T> of the final result object to deserialize into, or BinaryData if raw
response body should be kept. This should match the generic parameter
U .
Returns:
PollerFlux
<T,U>error
public static PollerFlux<T,U> <T,U>error(Exception ex)
Creates a PollerFlux instance that returns an error on subscription.
Parameters:
Returns:
A poller flux instance that returns an error without emitting any data.
getPollInterval
public Duration getPollInterval()
Returns the current polling duration for this PollerFlux<T,U> instance.
Returns:
The current polling duration.
getSyncPoller
public SyncPoller<T,U> getSyncPoller()
Gets a synchronous blocking poller.
Returns:
a synchronous blocking poller.
setPollInterval
public PollerFlux<T,U> setPollInterval(Duration pollInterval)
Sets the poll interval for this poller. The new interval will be used for all subsequent polling operations including the subscriptions that are already in progress.
Parameters:
pollInterval
- The new poll interval for this poller.
Returns:
subscribe
public void subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)
Overrides:
PollerFlux<T,U>.subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)
Parameters:
actual