PollerFlux<T,U> Class

  • java.lang.Object
    • reactor.core.publisher.Flux
      • com.azure.core.util.polling.PollerFlux<T,U>

Type Parameters

T

The type of poll response value.

U

The type of the final result of long-running operation.

public final class PollerFlux<T,U>
extends reactor.core.publisher.Flux<AsyncPollResponse<T,U>>

A Flux that simplifies the task of executing long-running operations against an Azure service. A subscription to PollerFlux<T,U> initiates a long-running operation and polls the status until it completes.

Code samples

Instantiating and subscribing to PollerFlux

LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMillis(800));

 // Create poller instance
 PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
     (context) -> Mono.empty(),
     // Define your custom poll operation
     (context) ->  {
         if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
             System.out.println("Returning intermediate response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
                     "Operation in progress."));
         } else {
             System.out.println("Returning final response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
                     "Operation completed."));
         }
     },
     (activationResponse, context) -> Mono.error(new RuntimeException("Cancellation is not supported")),
     (context) -> Mono.just("Final Output"));

 // Listen to poll responses
 poller.subscribe(response -> {
     // Process poll response
     System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
 });
 // Do something else

Asynchronously wait for polling to complete and then retrieve the final result

LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));

 // Create poller instance
 PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
     (context) -> Mono.empty(),
     (context) ->  {
         if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
             System.out.println("Returning intermediate response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
                     "Operation in progress."));
         } else {
             System.out.println("Returning final response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
                     "Operation completed."));
         }
     },
     (activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
     (context) -> Mono.just("FromServer:FinalOutput"));

 poller.take(Duration.ofMinutes(30))
         .last()
         .flatMap(asyncPollResponse -> {
             if (asyncPollResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
                 // operation completed successfully, retrieving final result.
                 return asyncPollResponse
                         .getFinalResult();
             } else {
                 return Mono.error(new RuntimeException("polling completed unsuccessfully with status:"
                         + asyncPollResponse.getStatus()));
             }
         }).block();

Block for polling to complete and then retrieve the final result

AsyncPollResponse<String, String> terminalResponse = pollerFlux.blockLast();
 System.out.printf("Polling complete. Final Status: %s", terminalResponse.getStatus());
 if (terminalResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
     String finalResult = terminalResponse.getFinalResult().block();
     System.out.printf("Polling complete. Final Status: %s", finalResult);
 }

Asynchronously poll until poller receives matching status

final Predicate<AsyncPollResponse<String, String>> isComplete = response -> {
     return response.getStatus() != LongRunningOperationStatus.IN_PROGRESS
         && response.getStatus() != LongRunningOperationStatus.NOT_STARTED;
 };

 pollerFlux
     .takeUntil(isComplete)
     .subscribe(completed -> {
         System.out.println("Completed poll response, status: " + completed.getStatus());
     });

Asynchronously cancel the long running operation

LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));

 // Create poller instance
 PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
     (context) -> Mono.empty(),
     (context) ->  {
         if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
             System.out.println("Returning intermediate response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
                     "Operation in progress."));
         } else {
             System.out.println("Returning final response.");
             return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
                     "Operation completed."));
         }
     },
     (activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
     (context) -> Mono.just("FromServer:FinalOutput"));

 // Asynchronously wait 30 minutes to complete the polling, if not completed
 // within in the time then cancel the server operation.
 poller.take(Duration.ofMinutes(30))
         .last()
         .flatMap(asyncPollResponse -> {
             if (!asyncPollResponse.getStatus().isComplete()) {
                 return asyncPollResponse
                         .cancelOperation()
                         .then(Mono.error(new RuntimeException("Operation is cancelled!")));
             } else {
                 return Mono.just(asyncPollResponse);
             }
         }).block();

Instantiating and subscribing to PollerFlux from a known polling strategy

// Create poller instance
 PollerFlux<BinaryData, String> poller = PollerFlux.create(
     Duration.ofMillis(100),
     // pass in your custom activation operation
     () -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
         HttpMethod.POST,
         "http://httpbin.org"),
         202,
         new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
         null)),
     new OperationResourcePollingStrategy<>(new HttpPipelineBuilder().build()),
     TypeReference.createInstance(BinaryData.class),
     TypeReference.createInstance(String.class));

 // Listen to poll responses
 poller.subscribe(response -> {
     // Process poll response
     System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
 });
 // Do something else

Instantiating and subscribing to PollerFlux from a custom polling strategy

// Create custom polling strategy based on OperationResourcePollingStrategy
 PollingStrategy<BinaryData, String> strategy = new OperationResourcePollingStrategy<BinaryData, String>(
         new HttpPipelineBuilder().build()) {
     // override any interface method to customize the polling behavior
     @Override
     public Mono<PollResponse<BinaryData>> poll(PollingContext<BinaryData> context,
                                                TypeReference<BinaryData> pollResponseType) {
         return Mono.just(new PollResponse<>(
             LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
             BinaryData.fromString("")));
     }
 };

 // Create poller instance
 PollerFlux<BinaryData, String> poller = PollerFlux.create(
     Duration.ofMillis(100),
     // pass in your custom activation operation
     () -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
         HttpMethod.POST,
         "http://httpbin.org"),
         202,
         new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
         null)),
     strategy,
     TypeReference.createInstance(BinaryData.class),
     TypeReference.createInstance(String.class));

 // Listen to poll responses
 poller.subscribe(response -> {
     // Process poll response
     System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
 });
 // Do something else

Constructor Summary

Constructor Description
PollerFlux(Duration pollInterval, Function<PollingContext<T>,Mono<T>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono<U>> fetchResultOperation)

Creates PollerFlux.

Method Summary

Modifier and Type Method and Description
static PollerFlux<T,U> <T,U>create(Duration pollInterval, Function<PollingContext<T>,Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono<U>> fetchResultOperation)

Creates PollerFlux.

static PollerFlux<T,U> <T,U>create(Duration pollInterval, Supplier<Mono<? extends Response<?>>> initialOperation, PollingStrategy<T,U> strategy, TypeReference<T> pollResponseType, TypeReference<U> resultType)

Creates PollerFlux.

static PollerFlux<T,U> <T,U>error(Exception ex)

Creates a PollerFlux instance that returns an error on subscription.

Duration getPollInterval()

Returns the current polling duration for this PollerFlux<T,U> instance.

SyncPoller<T,U> getSyncPoller()

Gets a synchronous blocking poller.

PollerFlux<T,U> setPollInterval(Duration pollInterval)

Sets the poll interval for this poller.

void subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)

Methods inherited from java.lang.Object

Methods inherited from reactor.core.publisher.Flux

reactor.core.publisher.Flux.<A>reduce(A,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>reduceWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>scan(A,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>scanWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<C>buffer reactor.core.publisher.Flux.<C>buffer reactor.core.publisher.Flux.<C>buffer(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<E>cast reactor.core.publisher.Flux.<E>collect(java.util.function.Supplier<E>,java.util.function.BiConsumer<E, reactor.core.publisher.Flux.<E>doOnError(java.lang.Class<E>,java.util.function.Consumer< reactor.core.publisher.Flux.<E>onErrorContinue reactor.core.publisher.Flux.<E>onErrorContinue reactor.core.publisher.Flux.<E>onErrorMap(java.lang.Class<E>,java.util.function.Function< reactor.core.publisher.Flux.<E>onErrorResume(java.lang.Class<E>,java.util.function.Function< reactor.core.publisher.Flux.<E>onErrorReturn reactor.core.publisher.Flux.<E>subscribeWith reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function< reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function< reactor.core.publisher.Flux.<I>first(java.lang.Iterable< reactor.core.publisher.Flux.<I>first(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>firstWithSignal(java.lang.Iterable< reactor.core.publisher.Flux.<I>firstWithSignal(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>firstWithValue(java.lang.Iterable< reactor.core.publisher.Flux.<I>firstWithValue(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>index(java.util.function.BiFunction< reactor.core.publisher.Flux.<I>merge(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>merge(java.lang.Iterable< reactor.core.publisher.Flux.<I>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeComparing(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeDelayError(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeOrdered(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergePriority(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequential(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable< reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable< reactor.core.publisher.Flux.<I>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequentialDelayError(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequentialDelayError(java.lang.Iterable< reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<O>error reactor.core.publisher.Flux.<O>zip(java.lang.Iterable< reactor.core.publisher.Flux.<O>zip(java.lang.Iterable< reactor.core.publisher.Flux.<P>as(java.util.function.Function< reactor.core.publisher.Flux.<R,A>collect(java.util.stream.Collector< reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>doOnDiscard(java.lang.Class<R>,java.util.function.Consumer< reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequentialDelayError(java.util.function.Function< reactor.core.publisher.Flux.<R>handle(java.util.function.BiConsumer< reactor.core.publisher.Flux.<R>publish(java.util.function.Function< reactor.core.publisher.Flux.<R>publish(java.util.function.Function< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function< reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function< reactor.core.publisher.Flux.<T,S>generate reactor.core.publisher.Flux.<T,S>generate(java.util.concurrent.Callable<S>,java.util.function.BiFunction<S,reactor.core.publisher.SynchronousSink<T>,S>,java.util.function.Consumer< reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable< reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable< reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,O>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7,T8>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWithIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2>zipWithIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T>concat(java.lang.Iterable< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>create(java.util.function.Consumer< reactor.core.publisher.Flux.<T>create(java.util.function.Consumer< reactor.core.publisher.Flux.<T>defer(java.util.function.Supplier< reactor.core.publisher.Flux.<T>deferContextual(java.util.function.Function<reactor.util.context.ContextView, reactor.core.publisher.Flux.<T>empty reactor.core.publisher.Flux.<T>error reactor.core.publisher.Flux.<T>error(java.util.function.Supplier< reactor.core.publisher.Flux.<T>from(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>fromArray reactor.core.publisher.Flux.<T>fromIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T>fromStream(java.util.function.Supplier<java.util.stream.Stream< reactor.core.publisher.Flux.<T>fromStream(java.util.stream.Stream< reactor.core.publisher.Flux.<T>generate reactor.core.publisher.Flux.<T>just reactor.core.publisher.Flux.<T>just reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeComparing(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeComparing(java.util.Comparator< reactor.core.publisher.Flux.<T>mergeComparingDelayError(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeOrdered(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeOrdered(java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriority(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriority(java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriorityDelayError(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeSequentialDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>never reactor.core.publisher.Flux.<T>onAssembly reactor.core.publisher.Flux.<T>onAssembly reactor.core.publisher.Flux.<T>push(java.util.function.Consumer< reactor.core.publisher.Flux.<T>push(java.util.function.Consumer< reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>groupJoin(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>join(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TUPLE,V>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<U,R>withLatestFrom(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<U,V,C>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>windowWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U>delaySubscription reactor.core.publisher.Flux.<U>ofType reactor.core.publisher.Flux.<U>sample reactor.core.publisher.Flux.<U>sampleFirst(java.util.function.Function< reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function< reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function< reactor.core.publisher.Flux.<U>timeout reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>map(java.util.function.Function< reactor.core.publisher.Flux.<V>mapNotNull(java.util.function.Function< reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function< reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function< reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal< reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal< reactor.core.publisher.Flux.<V>then reactor.core.publisher.Flux.<V>thenMany reactor.core.publisher.Flux.<V>transform(java.util.function.Function< reactor.core.publisher.Flux.<V>transformDeferred(java.util.function.Function< reactor.core.publisher.Flux.<V>transformDeferredContextual(java.util.function.BiFunction< reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<X>dematerialize reactor.core.publisher.Flux.all(java.util.function.Predicate< reactor.core.publisher.Flux.any(java.util.function.Predicate< reactor.core.publisher.Flux.blockFirst reactor.core.publisher.Flux.blockFirst reactor.core.publisher.Flux.blockLast reactor.core.publisher.Flux.blockLast reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer(org.reactivestreams.Publisher< reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate< reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate< reactor.core.publisher.Flux.bufferUntilChanged reactor.core.publisher.Flux.bufferWhile(java.util.function.Predicate< reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cancelOn reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.collectList reactor.core.publisher.Flux.collectSortedList reactor.core.publisher.Flux.collectSortedList(java.util.Comparator< reactor.core.publisher.Flux.concatWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.concatWithValues reactor.core.publisher.Flux.contextCapture reactor.core.publisher.Flux.contextWrite reactor.core.publisher.Flux.contextWrite reactor.core.publisher.Flux.count reactor.core.publisher.Flux.defaultIfEmpty reactor.core.publisher.Flux.delayElements reactor.core.publisher.Flux.delayElements reactor.core.publisher.Flux.delaySequence reactor.core.publisher.Flux.delaySequence reactor.core.publisher.Flux.delaySubscription reactor.core.publisher.Flux.delaySubscription reactor.core.publisher.Flux.delayUntil(java.util.function.Function< reactor.core.publisher.Flux.distinct reactor.core.publisher.Flux.distinctUntilChanged reactor.core.publisher.Flux.doAfterTerminate reactor.core.publisher.Flux.doFinally reactor.core.publisher.Flux.doFirst reactor.core.publisher.Flux.doOnCancel reactor.core.publisher.Flux.doOnComplete reactor.core.publisher.Flux.doOnEach(java.util.function.Consumer< reactor.core.publisher.Flux.doOnError(java.util.function.Consumer< reactor.core.publisher.Flux.doOnError(java.util.function.Predicate< reactor.core.publisher.Flux.doOnNext(java.util.function.Consumer< reactor.core.publisher.Flux.doOnRequest reactor.core.publisher.Flux.doOnSubscribe(java.util.function.Consumer< reactor.core.publisher.Flux.doOnTerminate reactor.core.publisher.Flux.elapsed reactor.core.publisher.Flux.elapsed reactor.core.publisher.Flux.elementAt reactor.core.publisher.Flux.elementAt reactor.core.publisher.Flux.expand(java.util.function.Function< reactor.core.publisher.Flux.expand(java.util.function.Function< reactor.core.publisher.Flux.expandDeep(java.util.function.Function< reactor.core.publisher.Flux.expandDeep(java.util.function.Function< reactor.core.publisher.Flux.filter(java.util.function.Predicate< reactor.core.publisher.Flux.filterWhen(java.util.function.Function< reactor.core.publisher.Flux.filterWhen(java.util.function.Function< reactor.core.publisher.Flux.getPrefetch reactor.core.publisher.Flux.hasElement reactor.core.publisher.Flux.hasElements reactor.core.publisher.Flux.hide reactor.core.publisher.Flux.ignoreElements reactor.core.publisher.Flux.index reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.last reactor.core.publisher.Flux.last reactor.core.publisher.Flux.limitRate reactor.core.publisher.Flux.limitRate reactor.core.publisher.Flux.limitRequest reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.materialize reactor.core.publisher.Flux.mergeComparingWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.mergeOrderedWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.mergeWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.metrics reactor.core.publisher.Flux.name reactor.core.publisher.Flux.next reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureDrop reactor.core.publisher.Flux.onBackpressureDrop(java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureError reactor.core.publisher.Flux.onBackpressureLatest reactor.core.publisher.Flux.onErrorComplete reactor.core.publisher.Flux.onErrorComplete(java.lang.Class< reactor.core.publisher.Flux.onErrorComplete(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorContinue reactor.core.publisher.Flux.onErrorMap(java.util.function.Function< reactor.core.publisher.Flux.onErrorMap(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorResume(java.util.function.Function< reactor.core.publisher.Flux.onErrorResume(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorReturn reactor.core.publisher.Flux.onErrorReturn(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorStop reactor.core.publisher.Flux.onTerminateDetach reactor.core.publisher.Flux.or(org.reactivestreams.Publisher< reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.publish reactor.core.publisher.Flux.publish reactor.core.publisher.Flux.publishNext reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.range reactor.core.publisher.Flux.reduce reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeatWhen(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.retry reactor.core.publisher.Flux.retry reactor.core.publisher.Flux.retryWhen reactor.core.publisher.Flux.sample reactor.core.publisher.Flux.sampleFirst reactor.core.publisher.Flux.scan reactor.core.publisher.Flux.share reactor.core.publisher.Flux.shareNext reactor.core.publisher.Flux.single reactor.core.publisher.Flux.single reactor.core.publisher.Flux.singleOrEmpty reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skipLast reactor.core.publisher.Flux.skipUntil(java.util.function.Predicate< reactor.core.publisher.Flux.skipUntilOther(org.reactivestreams.Publisher< reactor.core.publisher.Flux.skipWhile(java.util.function.Predicate< reactor.core.publisher.Flux.sort reactor.core.publisher.Flux.sort(java.util.Comparator< reactor.core.publisher.Flux.startWith reactor.core.publisher.Flux.startWith(java.lang.Iterable< reactor.core.publisher.Flux.startWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.subscribe reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(org.reactivestreams.Subscriber< reactor.core.publisher.Flux.subscribe(reactor.core.CoreSubscriber< reactor.core.publisher.Flux.subscribeOn reactor.core.publisher.Flux.subscribeOn reactor.core.publisher.Flux.switchIfEmpty(org.reactivestreams.Publisher< reactor.core.publisher.Flux.tag reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.takeLast reactor.core.publisher.Flux.takeUntil(java.util.function.Predicate< reactor.core.publisher.Flux.takeUntilOther(org.reactivestreams.Publisher< reactor.core.publisher.Flux.takeWhile(java.util.function.Predicate< reactor.core.publisher.Flux.tap reactor.core.publisher.Flux.tap reactor.core.publisher.Flux.tap(reactor.core.observability.SignalListenerFactory<T, reactor.core.publisher.Flux.then reactor.core.publisher.Flux.thenEmpty reactor.core.publisher.Flux.timed reactor.core.publisher.Flux.timed reactor.core.publisher.Flux.timeout reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher< reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher< reactor.core.publisher.Flux.timeout reactor.core.publisher.Flux.timestamp reactor.core.publisher.Flux.timestamp reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toStream reactor.core.publisher.Flux.toStream reactor.core.publisher.Flux.toString reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window(org.reactivestreams.Publisher< reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowUntilChanged reactor.core.publisher.Flux.windowWhile reactor.core.publisher.Flux.windowWhile

Constructor Details

PollerFlux

public PollerFlux(Duration pollInterval, Function<PollingContext<T>,Mono<T>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono> fetchResultOperation)

Creates PollerFlux.

Parameters:

pollInterval - the polling interval
activationOperation - the activation operation to activate (start) the long-running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a new PollingContext<T>.
pollOperation - the operation to poll the current state of long-running operation. This parameter is required and the operation will be called with current PollingContext<T>.
cancelOperation - a Function that represents the operation to cancel the long-running operation if service supports cancellation. This parameter is required. If service does not support cancellation then the implementer should return Mono#errorwith an error message indicating absence of cancellation support. The operation will be called with current PollingContext<T>.
fetchResultOperation - a Function that represents the operation to retrieve final result of the long-running operation if service support it. This parameter is required and operation will be called with the current PollingContext<T>. If service does not have an api to fetch final result and if final result is same as final poll response value then implementer can choose to simply return value from provided final poll response.

Method Details

<T,U>create

public static PollerFlux<T,U> <T,U>create(Duration pollInterval, Function<PollingContext<T>,Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>,Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>,PollResponse<T>,Mono<T>> cancelOperation, Function<PollingContext<T>,Mono> fetchResultOperation)

Creates PollerFlux.

This method differs from the PollerFlux constructor in that the constructor uses an activationOperation which returns a Mono that emits result, the create method uses an activationOperation which returns a Mono that emits PollResponse<T>. The PollResponse<T> holds the result. If the PollResponse<T> from the activationOperation indicate that long-running operation is completed then the pollOperation will not be called.

Parameters:

pollInterval - the polling interval
activationOperation - the activation operation to activate (start) the long-running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a new PollingContext<T>.
pollOperation - the operation to poll the current state of long-running operation. This parameter is required and the operation will be called with current PollingContext<T>.
cancelOperation - a Function that represents the operation to cancel the long-running operation if service supports cancellation. This parameter is required. If service does not support cancellation then the implementer should return Mono#error with an error message indicating absence of cancellation support. The operation will be called with current PollingContext<T>.
fetchResultOperation - a Function that represents the operation to retrieve final result of the long-running operation if service support it. This parameter is required and operation will be called current PollingContext<T>. If service does not have an api to fetch final result and if final result is same as final poll response value then implementer can choose to simply return value from provided final poll response.

Returns:

PollerFlux

<T,U>create

public static PollerFlux<T,U> <T,U>create(Duration pollInterval, Supplier<Mono>> initialOperation, PollingStrategy<T,U> strategy, TypeReference<T> pollResponseType, TypeReference resultType)

Creates PollerFlux.

This method uses a PollingStrategy<T,U> to poll the status of a long-running operation after the activation operation is invoked. See PollingStrategy<T,U> for more details of known polling strategies and how to create a custom strategy.

Parameters:

pollInterval - the polling interval
initialOperation - the activation operation to activate (start) the long-running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a new PollingContext<T>.
strategy - a known strategy for polling a long-running operation in Azure
pollResponseType - the TypeReference<T> of the response type from a polling call, or BinaryData if raw response body should be kept. This should match the generic parameter U.
resultType - the TypeReference<T> of the final result object to deserialize into, or BinaryData if raw response body should be kept. This should match the generic parameter U.

Returns:

PollerFlux

<T,U>error

public static PollerFlux<T,U> <T,U>error(Exception ex)

Creates a PollerFlux instance that returns an error on subscription.

Parameters:

ex - The exception to be returned on subscription of this PollerFlux<T,U>.

Returns:

A poller flux instance that returns an error without emitting any data.

getPollInterval

public Duration getPollInterval()

Returns the current polling duration for this PollerFlux<T,U> instance.

Returns:

The current polling duration.

getSyncPoller

public SyncPoller<T,U> getSyncPoller()

Gets a synchronous blocking poller.

Returns:

a synchronous blocking poller.

setPollInterval

public PollerFlux<T,U> setPollInterval(Duration pollInterval)

Sets the poll interval for this poller. The new interval will be used for all subsequent polling operations including the subscriptions that are already in progress.

Parameters:

pollInterval - The new poll interval for this poller.

Returns:

The updated instance of PollerFlux<T,U>.

subscribe

public void subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)

Overrides:

PollerFlux<T,U>.subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)

Parameters:

actual

Applies to